jsonrpc_v2/
lib.rs

1/*!
2A very small and very fast JSON-RPC 2.0 server-focused framework.
3
4Provides integrations for both `hyper` and `actix-web` (1.x, 2.x, 3.x, 4.x).
5Enable features `actix-web-v3-integration`, `hyper-integration`, etc. as needed.
6
7`actix-web-v4-integration` is enabled by default. Make sure to add `default-features = false` if using `hyper` or other `actix-web` versions.
8
9Also see the `easy-errors` feature flag (not enabled by default). Enabling this flag will implement [`ErrorLike`](https://docs.rs/jsonrpc-v2/*/jsonrpc_v2/trait.ErrorLike.html)
10for anything that implements `Display`, and the display value will be provided in the `message` field of the JSON-RPC 2.0 `Error` response.
11
12Otherwise, custom errors should implement [`ErrorLike`](https://docs.rs/jsonrpc-v2/*/jsonrpc_v2/trait.ErrorLike.html) to map errors to the JSON-RPC 2.0 `Error` response.
13
14Individual method handlers are `async` functions that can take various kinds of args (things that can be extracted from the request, like
15the `Params` or `Data`), and should return a `Result<Item, Error>` where the `Item` is serializable. See examples below.
16
17# Usage
18
19```rust,no_run
20use jsonrpc_v2::{Data, Error, Params, Server};
21
22#[derive(serde::Deserialize)]
23struct TwoNums {
24    a: usize,
25    b: usize,
26}
27
28async fn add(Params(params): Params<TwoNums>) -> Result<usize, Error> {
29    Ok(params.a + params.b)
30}
31
32async fn sub(Params(params): Params<(usize, usize)>) -> Result<usize, Error> {
33    Ok(params.0 - params.1)
34}
35
36async fn message(data: Data<String>) -> Result<String, Error> {
37    Ok(String::from(&*data))
38}
39
40#[actix_rt::main]
41async fn main() -> std::io::Result<()> {
42    let rpc = Server::new()
43        .with_data(Data::new(String::from("Hello!")))
44        .with_method("sub", sub)
45        .with_method("message", message)
46        .finish();
47
48    actix_web::HttpServer::new(move || {
49        let rpc = rpc.clone();
50        actix_web::App::new().service(
51            actix_web::web::service("/api")
52                .guard(actix_web::guard::Post())
53                .finish(rpc.into_web_service()),
54        )
55    })
56    .bind("0.0.0.0:3000")?
57    .run()
58    .await
59}
60```
61*/
62
63use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer};
64
65use serde_json::{value::RawValue, Value};
66
67use std::sync::Arc;
68
69use futures::{
70    future::{self, Future, FutureExt},
71    stream::StreamExt,
72};
73
74#[cfg(any(
75    feature = "actix-web-v1",
76    feature = "actix-web-v2",
77    feature = "actix-web-v3",
78    feature = "actix-web-v4"
79))]
80use futures::future::TryFutureExt;
81
82#[cfg(any(feature = "actix-web-v2", feature = "actix-web-v3", feature = "actix-web-v4"))]
83use futures::stream::TryStreamExt;
84
85use extensions::concurrent::Extensions;
86use std::{collections::HashMap, marker::PhantomData};
87
88#[cfg(feature = "bytes-v10")]
89use bytes_v10::Bytes;
90
91#[cfg(feature = "bytes-v05")]
92use bytes_v05::Bytes;
93
94#[cfg(feature = "bytes-v04")]
95use bytes_v04::Bytes;
96
97#[cfg(feature = "macros")]
98pub use jsonrpc_v2_macros::jsonrpc_v2_method;
99
100#[cfg(feature = "hyper-integration")]
101pub use factory::hyper::*;
102
103#[cfg(any(
104    feature = "actix-web-v1",
105    feature = "actix-web-v2",
106    feature = "actix-web-v3",
107    feature = "actix-web-v4"
108))]
109pub use factory::actix::*;
110
111mod factory;
112
113#[cfg(feature = "macros")]
114pub mod exp {
115    pub use serde;
116    pub use serde_json;
117}
118
119type BoxedSerialize = Box<dyn erased_serde::Serialize + Send>;
120
121/// Error object in a response
122#[derive(Serialize)]
123#[serde(untagged)]
124pub enum Error {
125    Full {
126        code: i64,
127        message: String,
128        #[serde(skip_serializing_if = "Option::is_none")]
129        data: Option<BoxedSerialize>,
130    },
131    Provided {
132        code: i64,
133        message: &'static str,
134    },
135}
136
137impl Error {
138    pub const INVALID_REQUEST: Self = Error::Provided { code: -32600, message: "Invalid Request" };
139    pub const METHOD_NOT_FOUND: Self =
140        Error::Provided { code: -32601, message: "Method not found" };
141    pub const INVALID_PARAMS: Self = Error::Provided { code: -32602, message: "Invalid params" };
142    pub const INTERNAL_ERROR: Self = Error::Provided { code: -32603, message: "Internal Error" };
143    pub const PARSE_ERROR: Self = Error::Provided { code: -32700, message: "Parse error" };
144
145    pub fn internal<D: std::fmt::Display + Send>(e: D) -> Self {
146        Error::Full {
147            code: -32603,
148            message: "Internal Error".into(),
149            data: Some(Box::new(e.to_string())),
150        }
151    }
152
153    pub fn invalid_params(e: serde_json::Error) -> Self {
154        Error::Full {
155            code: -32602,
156            message: "Invalid params".into(),
157            data: Some(Box::new(format!("{e:#?}"))),
158        }
159    }
160}
161
162/// Trait that can be used to map custom errors to the [`Error`](enum.Error.html) object.
163pub trait ErrorLike: std::fmt::Display {
164    /// Code to be used in JSON-RPC 2.0 Error object. Default is 0.
165    fn code(&self) -> i64 {
166        0
167    }
168
169    /// Message to be used in JSON-RPC 2.0 Error object. Default is the `Display` value of the item.
170    fn message(&self) -> String {
171        self.to_string()
172    }
173
174    /// Any additional data to be sent with the error. Default is `None`.
175    fn data(&self) -> Option<BoxedSerialize> {
176        None
177    }
178}
179
180impl<T> From<T> for Error
181where
182    T: ErrorLike,
183{
184    fn from(t: T) -> Error {
185        Error::Full { code: t.code(), message: t.message(), data: t.data() }
186    }
187}
188
189#[cfg(feature = "easy-errors")]
190impl<T> ErrorLike for T where T: std::fmt::Display {}
191
192#[doc(hidden)]
193#[derive(Default, Debug)]
194pub struct V2;
195
196impl Serialize for V2 {
197    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198    where
199        S: Serializer,
200    {
201        "2.0".serialize(serializer)
202    }
203}
204
205impl<'de> Deserialize<'de> for V2 {
206    fn deserialize<D>(deserializer: D) -> Result<V2, D::Error>
207    where
208        D: Deserializer<'de>,
209    {
210        let s: &str = Deserialize::deserialize(deserializer)?;
211        if s == "2.0" {
212            Ok(V2)
213        } else {
214            Err(serde::de::Error::custom("Could not deserialize V2"))
215        }
216    }
217}
218
219/// Container for the request ID, which can be a string, number, or null.
220/// Not typically used directly.
221#[derive(Debug, Clone, Serialize, Deserialize)]
222#[serde(untagged)]
223pub enum Id {
224    Num(i64),
225    Str(Box<str>),
226    Null,
227}
228
229impl From<i64> for Id {
230    fn from(t: i64) -> Self {
231        Id::Num(t)
232    }
233}
234
235impl<'a> From<&'a str> for Id {
236    fn from(t: &'a str) -> Self {
237        Id::Str(t.into())
238    }
239}
240
241impl From<String> for Id {
242    fn from(t: String) -> Self {
243        Id::Str(t.into())
244    }
245}
246
247impl Default for Id {
248    fn default() -> Self {
249        Id::Null
250    }
251}
252
253/// Method string wrapper, for `FromRequest` extraction
254#[derive(Debug)]
255pub struct Method(Box<str>);
256
257impl Method {
258    pub fn as_str(&self) -> &str {
259        &self.0
260    }
261}
262
263impl From<Method> for String {
264    fn from(t: Method) -> String {
265        String::from(t.0)
266    }
267}
268
269/// Builder struct for a request object
270#[derive(Default)]
271pub struct RequestBuilder<M = ()> {
272    id: Id,
273    params: Option<Value>,
274    method: M,
275}
276
277impl<M> RequestBuilder<M> {
278    pub fn with_id<I: Into<Id>>(mut self, id: I) -> Self {
279        self.id = id.into();
280        self
281    }
282
283    pub fn with_params<I: Into<Value>>(mut self, params: I) -> Self {
284        self.params = Some(params.into());
285        self
286    }
287}
288
289impl RequestBuilder<()> {
290    pub fn with_method<I: Into<String>>(self, method: I) -> RequestBuilder<String> {
291        let RequestBuilder { id, params, .. } = self;
292        RequestBuilder { id, params, method: method.into() }
293    }
294}
295
296impl RequestBuilder<String> {
297    pub fn finish(self) -> RequestObject {
298        let RequestBuilder { id, params, method } = self;
299        RequestObject {
300            jsonrpc: V2,
301            method: method.into_boxed_str(),
302            params: params.map(InnerParams::Value),
303            id: Some(Some(id)),
304        }
305    }
306}
307
308/// Builder struct for a notification request object
309#[derive(Default)]
310pub struct NotificationBuilder<M = ()> {
311    params: Option<Value>,
312    method: M,
313}
314
315impl<M> NotificationBuilder<M> {
316    pub fn with_params<I: Into<Value>>(mut self, params: I) -> Self {
317        self.params = Some(params.into());
318        self
319    }
320}
321
322impl NotificationBuilder<()> {
323    pub fn with_method<I: Into<String>>(self, method: I) -> NotificationBuilder<String> {
324        let NotificationBuilder { params, .. } = self;
325        NotificationBuilder { params, method: method.into() }
326    }
327}
328
329impl NotificationBuilder<String> {
330    pub fn finish(self) -> RequestObject {
331        let NotificationBuilder { method, params } = self;
332        RequestObject {
333            jsonrpc: V2,
334            method: method.into_boxed_str(),
335            params: params.map(InnerParams::Value),
336            id: None,
337        }
338    }
339}
340
341#[derive(Debug, Serialize, Deserialize)]
342#[serde(untagged)]
343enum InnerParams {
344    Value(Value),
345    Raw(Box<RawValue>),
346}
347
348/// Request/Notification object
349#[derive(Debug, Deserialize, Serialize, Default)]
350#[serde(default)]
351pub struct RequestObject {
352    jsonrpc: V2,
353    method: Box<str>,
354    params: Option<InnerParams>,
355    #[serde(deserialize_with = "RequestObject::deserialize_id")]
356    #[serde(skip_serializing_if = "Option::is_none")]
357    id: Option<Option<Id>>,
358}
359
360impl RequestObject {
361    pub fn method_ref(&self) -> &str {
362        &self.method
363    }
364
365    pub fn id_ref(&self) -> Option<&Id> {
366        self.id.as_ref().and_then(|x| x.as_ref())
367    }
368}
369
370/// Request/Notification object
371#[derive(Debug, Deserialize, Default)]
372#[serde(default)]
373struct BytesRequestObject {
374    jsonrpc: V2,
375    method: Box<str>,
376    params: Option<Box<RawValue>>,
377    #[serde(deserialize_with = "RequestObject::deserialize_id")]
378    #[serde(skip_serializing_if = "Option::is_none")]
379    id: Option<Option<Id>>,
380}
381
382impl From<BytesRequestObject> for RequestObject {
383    fn from(t: BytesRequestObject) -> Self {
384        let BytesRequestObject { jsonrpc, method, params, id } = t;
385        RequestObject { jsonrpc, method, params: params.map(InnerParams::Raw), id }
386    }
387}
388
389impl RequestObject {
390    /// Build a new request object
391    pub fn request() -> RequestBuilder {
392        RequestBuilder::default()
393    }
394
395    /// Build a new notification request object
396    pub fn notification() -> NotificationBuilder {
397        NotificationBuilder::default()
398    }
399
400    fn deserialize_id<'de, D>(deserializer: D) -> Result<Option<Option<Id>>, D::Error>
401    where
402        D: Deserializer<'de>,
403    {
404        Ok(Some(Option::deserialize(deserializer)?))
405    }
406}
407
408#[doc(hidden)]
409pub struct RequestObjectWithData {
410    inner: RequestObject,
411    data: Arc<Extensions>,
412}
413
414/// [`FromRequest`](trait.FromRequest.html) wrapper for request params
415///
416/// Use a tuple to deserialize by-position params
417/// and a map or deserializable struct for by-name params: e.g.
418///
419/// ```rust,no_run
420/// fn handler(params: Params<(i32, String)>) -> Result<String, Error> { /* ... */ }
421/// ```
422#[derive(Deserialize)]
423pub struct Params<T>(pub T);
424
425impl<T> Params<T>
426where
427    T: DeserializeOwned,
428{
429    fn from_request_object(req: &RequestObject) -> Result<Self, Error> {
430        let res = match req.params {
431            Some(InnerParams::Raw(ref value)) => serde_json::from_str(value.get()),
432            Some(InnerParams::Value(ref value)) => serde_json::from_value(value.clone()),
433            None => serde_json::from_value(Value::Null),
434        };
435        res.map(Params).map_err(Error::invalid_params)
436    }
437}
438
439/// A trait to extract data from the request
440#[async_trait::async_trait]
441pub trait FromRequest: Sized {
442    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error>;
443}
444
445#[async_trait::async_trait]
446impl FromRequest for () {
447    async fn from_request(_: &RequestObjectWithData) -> Result<Self, Error> {
448        Ok(())
449    }
450}
451
452/// Data/state storage container
453pub struct Data<T>(pub Arc<T>);
454
455impl<T> Data<T> {
456    pub fn new(t: T) -> Self {
457        Data(Arc::new(t))
458    }
459}
460
461impl<T> std::ops::Deref for Data<T> {
462    type Target = T;
463
464    fn deref(&self) -> &Self::Target {
465        &*self.0
466    }
467}
468
469#[async_trait::async_trait]
470impl<T: Send + Sync + 'static> FromRequest for Data<T> {
471    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
472        let out = req.data.get::<Data<T>>().map(|x| Data(Arc::clone(&x.0))).ok_or_else(|| {
473            Error::internal(format!("Missing data for: `{}`", std::any::type_name::<T>()))
474        })?;
475        Ok(out)
476    }
477}
478
479#[async_trait::async_trait]
480impl<T: DeserializeOwned> FromRequest for Params<T> {
481    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
482        Ok(Self::from_request_object(&req.inner)?)
483    }
484}
485
486#[async_trait::async_trait]
487impl FromRequest for Id {
488    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
489        Ok(req
490            .inner
491            .id
492            .clone()
493            .and_then(|x| x)
494            .ok_or_else(|| Error::internal("No `id` provided"))?)
495    }
496}
497
498#[async_trait::async_trait]
499impl FromRequest for Method {
500    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
501        Ok(Method(req.inner.method.clone()))
502    }
503}
504
505#[async_trait::async_trait]
506impl<T: FromRequest> FromRequest for Option<T> {
507    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
508        Ok(T::from_request(req).await.ok())
509    }
510}
511
512#[async_trait::async_trait]
513impl<T1> FromRequest for (T1,)
514where
515    T1: FromRequest + Send,
516{
517    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
518        Ok((T1::from_request(req).await?,))
519    }
520}
521
522#[async_trait::async_trait]
523impl<T1, T2> FromRequest for (T1, T2)
524where
525    T1: FromRequest + Send,
526    T2: FromRequest + Send,
527{
528    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
529        let (t1, t2) = futures::join!(T1::from_request(req), T2::from_request(req));
530        Ok((t1?, t2?))
531    }
532}
533
534#[async_trait::async_trait]
535impl<T1, T2, T3> FromRequest for (T1, T2, T3)
536where
537    T1: FromRequest + Send,
538    T2: FromRequest + Send,
539    T3: FromRequest + Send,
540{
541    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
542        let (t1, t2, t3) =
543            futures::join!(T1::from_request(req), T2::from_request(req), T3::from_request(req));
544        Ok((t1?, t2?, t3?))
545    }
546}
547
548#[async_trait::async_trait]
549impl<T1, T2, T3, T4> FromRequest for (T1, T2, T3, T4)
550where
551    T1: FromRequest + Send,
552    T2: FromRequest + Send,
553    T3: FromRequest + Send,
554    T4: FromRequest + Send,
555{
556    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
557        let (t1, t2, t3, t4) = futures::join!(
558            T1::from_request(req),
559            T2::from_request(req),
560            T3::from_request(req),
561            T4::from_request(req)
562        );
563        Ok((t1?, t2?, t3?, t4?))
564    }
565}
566
567#[async_trait::async_trait]
568impl<T1, T2, T3, T4, T5> FromRequest for (T1, T2, T3, T4, T5)
569where
570    T1: FromRequest + Send,
571    T2: FromRequest + Send,
572    T3: FromRequest + Send,
573    T4: FromRequest + Send,
574    T5: FromRequest + Send,
575{
576    async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
577        let (t1, t2, t3, t4, t5) = futures::join!(
578            T1::from_request(req),
579            T2::from_request(req),
580            T3::from_request(req),
581            T4::from_request(req),
582            T5::from_request(req)
583        );
584        Ok((t1?, t2?, t3?, t4?, t5?))
585    }
586}
587
588#[doc(hidden)]
589struct Handler<F, S, E, T>
590where
591    F: Factory<S, E, T>,
592{
593    hnd: F,
594    _t: PhantomData<fn() -> (S, E, T)>,
595}
596
597impl<F, S, E, T> Handler<F, S, E, T>
598where
599    F: Factory<S, E, T>,
600{
601    fn new(hnd: F) -> Self {
602        Handler { hnd, _t: PhantomData }
603    }
604}
605
606#[cfg(any(
607    feature = "actix-web-v1",
608    feature = "actix-web-v2",
609    feature = "actix-web-v3",
610    feature = "actix-web-v4"
611))]
612impl<F, S, E, T> From<Handler<F, S, E, T>> for BoxedHandler
613where
614    F: Factory<S, E, T> + 'static + Send + Sync,
615    S: Serialize + Send + 'static,
616    Error: From<E>,
617    E: 'static,
618    T: FromRequest + 'static + Send,
619{
620    fn from(t: Handler<F, S, E, T>) -> BoxedHandler {
621        let hnd = Arc::new(t.hnd);
622
623        let inner = move |req: RequestObjectWithData| {
624            let hnd = Arc::clone(&hnd);
625            Box::pin(async move {
626                let out = {
627                    let param = T::from_request(&req).await?;
628                    hnd.call(param).await?
629                };
630                Ok(Box::new(out) as BoxedSerialize)
631            }) as std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>>>>
632        };
633
634        BoxedHandler(Box::new(inner))
635    }
636}
637
638#[cfg(feature = "hyper-integration")]
639impl<F, S, E, T> From<Handler<F, S, E, T>> for BoxedHandler
640where
641    F: Factory<S, E, T> + 'static + Send + Sync,
642    S: Serialize + Send + 'static,
643    Error: From<E>,
644    E: 'static,
645    T: FromRequest + 'static + Send,
646{
647    fn from(t: Handler<F, S, E, T>) -> BoxedHandler {
648        let hnd = Arc::new(t.hnd);
649
650        let inner = move |req: RequestObjectWithData| {
651            let hnd = Arc::clone(&hnd);
652            Box::pin(async move {
653                let out = {
654                    let param = T::from_request(&req).await?;
655                    hnd.call(param).await?
656                };
657                Ok(Box::new(out) as BoxedSerialize)
658            })
659                as std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>> + Send>>
660        };
661
662        BoxedHandler(Box::new(inner))
663    }
664}
665
666#[cfg(any(
667    feature = "actix-web-v1",
668    feature = "actix-web-v2",
669    feature = "actix-web-v3",
670    feature = "actix-web-v4"
671))]
672pub struct BoxedHandler(
673    Box<
674        dyn Fn(
675                RequestObjectWithData,
676            )
677                -> std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>>>>
678            + Send
679            + Sync,
680    >,
681);
682
683#[cfg(feature = "hyper-integration")]
684pub struct BoxedHandler(
685    Box<
686        dyn Fn(
687                RequestObjectWithData,
688            )
689                -> std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>> + Send>>
690            + Send
691            + Sync,
692    >,
693);
694
695pub struct MapRouter(HashMap<String, BoxedHandler>);
696
697impl Default for MapRouter {
698    fn default() -> Self {
699        MapRouter(HashMap::default())
700    }
701}
702
703pub trait Router: Default {
704    fn get(&self, name: &str) -> Option<&BoxedHandler>;
705    fn insert(&mut self, name: String, handler: BoxedHandler) -> Option<BoxedHandler>;
706}
707
708impl Router for MapRouter {
709    fn get(&self, name: &str) -> Option<&BoxedHandler> {
710        self.0.get(name)
711    }
712    fn insert(&mut self, name: String, handler: BoxedHandler) -> Option<BoxedHandler> {
713        self.0.insert(name, handler)
714    }
715}
716
717/// Server/request handler
718pub struct Server<R> {
719    data: Arc<Extensions>,
720    router: R,
721}
722
723/// Builder used to add methods to a server
724///
725/// Created with `Server::new` or `Server::with_state`
726pub struct ServerBuilder<R> {
727    data: Extensions,
728    router: R,
729}
730
731impl Server<MapRouter> {
732    pub fn new() -> ServerBuilder<MapRouter> {
733        Server::with_router(MapRouter::default())
734    }
735}
736
737impl<R: Router> Server<R> {
738    pub fn with_router(router: R) -> ServerBuilder<R> {
739        ServerBuilder { data: Extensions::new(), router }
740    }
741}
742
743impl<R: Router> ServerBuilder<R> {
744    /// Add a data/state storage container to the server
745    pub fn with_data<T: Send + Sync + 'static>(mut self, data: Data<T>) -> Self {
746        self.data.insert(data);
747        self
748    }
749
750    /// Add a method handler to the server
751    ///
752    /// The method is an async function that takes up to 5 [`FromRequest`](trait.FromRequest.html) items
753    /// and returns a value that can be resolved to a `TryFuture`, where `TryFuture::Ok` is a serializable object, e.g.:
754    ///
755    /// ```rust,no_run
756    /// async fn handle(params: Params<(i32, String)>, data: Data<HashMap<String, String>>) -> Result<String, Error> { /* ... */ }
757    /// ```
758    pub fn with_method<N, S, E, F, T>(mut self, name: N, handler: F) -> Self
759    where
760        N: Into<String>,
761        F: Factory<S, E, T> + Send + Sync + 'static,
762        S: Serialize + Send + 'static,
763        Error: From<E>,
764        E: 'static,
765        T: FromRequest + Send + 'static,
766    {
767        self.router.insert(name.into(), Handler::new(handler).into());
768        self
769    }
770
771    /// Convert the server builder into the finished struct, wrapped in an `Arc`
772    pub fn finish(self) -> Arc<Server<R>> {
773        let ServerBuilder { router, data } = self;
774        Arc::new(Server { router, data: Arc::new(data) })
775    }
776
777    /// Convert the server builder into the finished struct
778    pub fn finish_unwrapped(self) -> Server<R> {
779        let ServerBuilder { router, data } = self;
780        Server { router, data: Arc::new(data) }
781    }
782}
783
784/// The individual response object
785#[derive(Serialize)]
786#[serde(untagged)]
787pub enum ResponseObject {
788    Result { jsonrpc: V2, result: BoxedSerialize, id: Id },
789    Error { jsonrpc: V2, error: Error, id: Id },
790}
791
792impl ResponseObject {
793    fn result(result: BoxedSerialize, id: Id) -> Self {
794        ResponseObject::Result { jsonrpc: V2, result, id }
795    }
796
797    fn error(error: Error, id: Id) -> Self {
798        ResponseObject::Error { jsonrpc: V2, error, id }
799    }
800}
801
802/// Container for the response object(s) or `Empty` for notification request(s)
803#[derive(Serialize)]
804#[serde(untagged)]
805pub enum ResponseObjects {
806    One(ResponseObject),
807    Many(Vec<ResponseObject>),
808    Empty,
809}
810
811#[derive(Serialize)]
812#[serde(untagged)]
813enum ManyResponseObjects {
814    Many(Vec<ResponseObject>),
815    Empty,
816}
817
818#[derive(Serialize)]
819#[serde(untagged)]
820enum SingleResponseObject {
821    One(ResponseObject),
822    Empty,
823}
824
825impl From<ManyResponseObjects> for ResponseObjects {
826    fn from(t: ManyResponseObjects) -> Self {
827        match t {
828            ManyResponseObjects::Many(many) => ResponseObjects::Many(many),
829            ManyResponseObjects::Empty => ResponseObjects::Empty,
830        }
831    }
832}
833
834impl From<SingleResponseObject> for ResponseObjects {
835    fn from(t: SingleResponseObject) -> Self {
836        match t {
837            SingleResponseObject::One(one) => ResponseObjects::One(one),
838            SingleResponseObject::Empty => ResponseObjects::Empty,
839        }
840    }
841}
842
843impl SingleResponseObject {
844    fn result(result: BoxedSerialize, opt_id: Option<Id>) -> Self {
845        opt_id
846            .map(|id| SingleResponseObject::One(ResponseObject::result(result, id)))
847            .unwrap_or_else(|| SingleResponseObject::Empty)
848    }
849
850    fn error(error: Error, opt_id: Option<Id>) -> Self {
851        opt_id
852            .map(|id| SingleResponseObject::One(ResponseObject::error(error, id)))
853            .unwrap_or_else(|| SingleResponseObject::Empty)
854    }
855}
856
857/// An enum to contain the different kinds of possible requests: using the provided
858/// [`RequestObject`](struct.RequestObject.html), an array of `RequestObject`s, or raw bytes.
859///
860/// Typically not use directly, [`Server::handle`](struct.Server.html#method.handle) can take the individual variants
861pub enum RequestKind {
862    RequestObject(RequestObject),
863    ManyRequestObjects(Vec<RequestObject>),
864    Bytes(Bytes),
865}
866
867impl From<RequestObject> for RequestKind {
868    fn from(t: RequestObject) -> Self {
869        RequestKind::RequestObject(t)
870    }
871}
872
873impl From<Vec<RequestObject>> for RequestKind {
874    fn from(t: Vec<RequestObject>) -> Self {
875        RequestKind::ManyRequestObjects(t)
876    }
877}
878
879impl From<Bytes> for RequestKind {
880    fn from(t: Bytes) -> Self {
881        RequestKind::Bytes(t)
882    }
883}
884
885impl<'a> From<&'a [u8]> for RequestKind {
886    fn from(t: &'a [u8]) -> Self {
887        Bytes::from(t.to_vec()).into()
888    }
889}
890
891#[derive(Debug)]
892enum OneOrManyRawValues<'a> {
893    Many(Vec<&'a RawValue>),
894    One(&'a RawValue),
895}
896
897impl<'a> OneOrManyRawValues<'a> {
898    pub fn try_from_slice(slice: &'a [u8]) -> Result<Self, serde_json::Error> {
899        if slice.first() == Some(&b'[') {
900            Ok(OneOrManyRawValues::Many(serde_json::from_slice::<Vec<&RawValue>>(slice)?))
901        } else {
902            Ok(OneOrManyRawValues::One(serde_json::from_slice::<&RawValue>(slice)?))
903        }
904    }
905}
906
907impl<R> Server<R>
908where
909    R: Router + 'static,
910{
911    #[cfg(feature = "actix-web-v1-integration")]
912    fn handle_bytes_compat(
913        &self,
914        bytes: Bytes,
915    ) -> impl futures_v01::Future<Item = ResponseObjects, Error = ()> {
916        self.handle_bytes(bytes).unit_error().boxed().compat()
917    }
918
919    /// Handle requests, and return appropriate responses
920    pub fn handle<I: Into<RequestKind>>(&self, req: I) -> impl Future<Output = ResponseObjects> {
921        match req.into() {
922            RequestKind::Bytes(bytes) => future::Either::Left(self.handle_bytes(bytes)),
923            RequestKind::RequestObject(req) => future::Either::Right(future::Either::Left(
924                self.handle_request_object(req).map(From::from),
925            )),
926            RequestKind::ManyRequestObjects(reqs) => future::Either::Right(future::Either::Right(
927                self.handle_many_request_objects(reqs).map(From::from),
928            )),
929        }
930    }
931
932    fn handle_request_object(
933        &self,
934        req: RequestObject,
935    ) -> impl Future<Output = SingleResponseObject> {
936        let req = RequestObjectWithData { inner: req, data: Arc::clone(&self.data) };
937
938        let opt_id = match req.inner.id {
939            Some(Some(ref id)) => Some(id.clone()),
940            Some(None) => Some(Id::Null),
941            None => None,
942        };
943
944        if let Some(method) = self.router.get(req.inner.method.as_ref()) {
945            let out = (&method.0)(req).then(|res| match res {
946                Ok(val) => future::ready(SingleResponseObject::result(val, opt_id)),
947                Err(e) => future::ready(SingleResponseObject::error(e, opt_id)),
948            });
949            future::Either::Left(out)
950        } else {
951            future::Either::Right(future::ready(SingleResponseObject::error(
952                Error::METHOD_NOT_FOUND,
953                opt_id,
954            )))
955        }
956    }
957
958    fn handle_many_request_objects<I: IntoIterator<Item = RequestObject>>(
959        &self,
960        reqs: I,
961    ) -> impl Future<Output = ManyResponseObjects> {
962        reqs.into_iter()
963            .map(|r| self.handle_request_object(r))
964            .collect::<futures::stream::FuturesUnordered<_>>()
965            .filter_map(|res| async move {
966                match res {
967                    SingleResponseObject::One(r) => Some(r),
968                    _ => None,
969                }
970            })
971            .collect::<Vec<_>>()
972            .map(|vec| {
973                if vec.is_empty() {
974                    ManyResponseObjects::Empty
975                } else {
976                    ManyResponseObjects::Many(vec)
977                }
978            })
979    }
980
981    fn handle_bytes(&self, bytes: Bytes) -> impl Future<Output = ResponseObjects> {
982        if let Ok(raw_values) = OneOrManyRawValues::try_from_slice(bytes.as_ref()) {
983            match raw_values {
984                OneOrManyRawValues::Many(raw_reqs) => {
985                    if raw_reqs.is_empty() {
986                        return future::Either::Left(future::ready(ResponseObjects::One(
987                            ResponseObject::error(Error::INVALID_REQUEST, Id::Null),
988                        )));
989                    }
990
991                    let (okays, errs) = raw_reqs
992                        .into_iter()
993                        .map(|x| {
994                            serde_json::from_str::<BytesRequestObject>(x.get())
995                                .map(RequestObject::from)
996                        })
997                        .partition::<Vec<_>, _>(|x| x.is_ok());
998
999                    let errs = errs
1000                        .into_iter()
1001                        .map(|_| ResponseObject::error(Error::INVALID_REQUEST, Id::Null))
1002                        .collect::<Vec<_>>();
1003
1004                    future::Either::Right(future::Either::Left(
1005                        self.handle_many_request_objects(okays.into_iter().flat_map(|x| x)).map(
1006                            |res| match res {
1007                                ManyResponseObjects::Many(mut many) => {
1008                                    many.extend(errs);
1009                                    ResponseObjects::Many(many)
1010                                }
1011                                ManyResponseObjects::Empty => {
1012                                    if errs.is_empty() {
1013                                        ResponseObjects::Empty
1014                                    } else {
1015                                        ResponseObjects::Many(errs)
1016                                    }
1017                                }
1018                            },
1019                        ),
1020                    ))
1021                }
1022                OneOrManyRawValues::One(raw_req) => {
1023                    match serde_json::from_str::<BytesRequestObject>(raw_req.get())
1024                        .map(RequestObject::from)
1025                    {
1026                        Ok(rn) => future::Either::Right(future::Either::Right(
1027                            self.handle_request_object(rn).map(|res| match res {
1028                                SingleResponseObject::One(r) => ResponseObjects::One(r),
1029                                _ => ResponseObjects::Empty,
1030                            }),
1031                        )),
1032                        Err(_) => future::Either::Left(future::ready(ResponseObjects::One(
1033                            ResponseObject::error(Error::INVALID_REQUEST, Id::Null),
1034                        ))),
1035                    }
1036                }
1037            }
1038        } else {
1039            future::Either::Left(future::ready(ResponseObjects::One(ResponseObject::error(
1040                Error::PARSE_ERROR,
1041                Id::Null,
1042            ))))
1043        }
1044    }
1045
1046    #[cfg(feature = "actix-web-v1-integration")]
1047    /// Converts the server into an `actix-web` compatible `NewService`
1048    pub fn into_actix_web_service(
1049        self: Arc<Self>,
1050    ) -> impl actix_service_v04::NewService<
1051        Request = actix_web_v1::dev::ServiceRequest,
1052        Response = actix_web_v1::dev::ServiceResponse,
1053        Error = actix_web_v1::Error,
1054        Config = (),
1055        InitError = (),
1056    > {
1057        use futures_v01::{Future, Stream};
1058
1059        let service = Arc::clone(&self);
1060
1061        let inner = move |req: actix_web_v1::dev::ServiceRequest| {
1062            let service = Arc::clone(&service);
1063            let (req, payload) = req.into_parts();
1064            let rt = payload
1065                .map_err(actix_web_v1::Error::from)
1066                .fold(actix_web_v1::web::BytesMut::new(), move |mut body, chunk| {
1067                    body.extend_from_slice(&chunk);
1068                    Ok::<_, actix_web_v1::Error>(body)
1069                })
1070                .and_then(move |bytes| {
1071                    service.handle_bytes_compat(bytes.freeze()).then(|res| match res {
1072                        Ok(res_inner) => match res_inner {
1073                            ResponseObjects::Empty => Ok(actix_web_v1::dev::ServiceResponse::new(
1074                                req,
1075                                actix_web_v1::HttpResponse::NoContent().finish(),
1076                            )),
1077                            json => Ok(actix_web_v1::dev::ServiceResponse::new(
1078                                req,
1079                                actix_web_v1::HttpResponse::Ok().json(json),
1080                            )),
1081                        },
1082                        Err(_) => Ok(actix_web_v1::dev::ServiceResponse::new(
1083                            req,
1084                            actix_web_v1::HttpResponse::InternalServerError().into(),
1085                        )),
1086                    })
1087                });
1088            rt
1089        };
1090
1091        actix_service_v04::service_fn::<_, _, _, ()>(inner)
1092    }
1093
1094    #[cfg(feature = "actix-web-v2-integration")]
1095    /// Converts the server into an `actix-web` compatible `NewService`
1096    pub fn into_actix_web_service(
1097        self: Arc<Self>,
1098    ) -> impl actix_service_v1::ServiceFactory<
1099        Request = actix_web_v2::dev::ServiceRequest,
1100        Response = actix_web_v2::dev::ServiceResponse,
1101        Error = actix_web_v2::Error,
1102        Config = (),
1103        InitError = (),
1104    > {
1105        let service = Arc::clone(&self);
1106
1107        let inner = move |req: actix_web_v2::dev::ServiceRequest| {
1108            let service = Arc::clone(&service);
1109            let (req, payload) = req.into_parts();
1110            let rt = payload
1111                .map_err(actix_web_v2::Error::from)
1112                .try_fold(actix_web_v2::web::BytesMut::new(), move |mut body, chunk| async move {
1113                    body.extend_from_slice(&chunk);
1114                    Ok::<_, actix_web_v2::Error>(body)
1115                })
1116                .and_then(move |bytes| {
1117                    service.handle_bytes(bytes.freeze()).map(|res| match res {
1118                        ResponseObjects::Empty => Ok(actix_web_v2::dev::ServiceResponse::new(
1119                            req,
1120                            actix_web_v2::HttpResponse::NoContent().finish(),
1121                        )),
1122                        json => Ok(actix_web_v2::dev::ServiceResponse::new(
1123                            req,
1124                            actix_web_v2::HttpResponse::Ok().json(json),
1125                        )),
1126                    })
1127                });
1128            rt
1129        };
1130
1131        actix_service_v1::fn_service::<_, _, _, _, _, _>(inner)
1132    }
1133
1134    #[cfg(feature = "actix-web-v3-integration")]
1135    /// Converts the server into an `actix-web` compatible `NewService`
1136    pub fn into_actix_web_service(
1137        self: Arc<Self>,
1138    ) -> impl actix_service_v1::ServiceFactory<
1139        Request = actix_web_v3::dev::ServiceRequest,
1140        Response = actix_web_v3::dev::ServiceResponse,
1141        Error = actix_web_v3::Error,
1142        Config = (),
1143        InitError = (),
1144    > {
1145        let service = Arc::clone(&self);
1146
1147        let inner = move |req: actix_web_v3::dev::ServiceRequest| {
1148            let service = Arc::clone(&service);
1149            let (req, payload) = req.into_parts();
1150            let rt = payload
1151                .map_err(actix_web_v3::Error::from)
1152                .try_fold(actix_web_v3::web::BytesMut::new(), move |mut body, chunk| async move {
1153                    body.extend_from_slice(&chunk);
1154                    Ok::<_, actix_web_v3::Error>(body)
1155                })
1156                .and_then(move |bytes| {
1157                    service.handle_bytes(bytes.freeze()).map(|res| match res {
1158                        ResponseObjects::Empty => Ok(actix_web_v3::dev::ServiceResponse::new(
1159                            req,
1160                            actix_web_v3::HttpResponse::NoContent().finish(),
1161                        )),
1162                        json => Ok(actix_web_v3::dev::ServiceResponse::new(
1163                            req,
1164                            actix_web_v3::HttpResponse::Ok().json(json),
1165                        )),
1166                    })
1167                });
1168            rt
1169        };
1170
1171        actix_service_v1::fn_service::<_, _, _, _, _, _>(inner)
1172    }
1173
1174    #[cfg(feature = "actix-web-v4-integration")]
1175    /// Converts the server into an `actix-web` compatible `NewService`
1176    pub fn into_actix_web_service(
1177        self: Arc<Self>,
1178    ) -> impl actix_service_v2::ServiceFactory<
1179        actix_web_v4::dev::ServiceRequest,
1180        Response = actix_web_v4::dev::ServiceResponse,
1181        Error = actix_web_v4::Error,
1182        Config = (),
1183        InitError = (),
1184    > {
1185        let service = Arc::clone(&self);
1186
1187        let inner = move |req: actix_web_v4::dev::ServiceRequest| {
1188            let service = Arc::clone(&service);
1189            let (req, payload) = req.into_parts();
1190            let rt = payload
1191                .map_err(actix_web_v4::Error::from)
1192                .try_fold(actix_web_v4::web::BytesMut::new(), move |mut body, chunk| async move {
1193                    body.extend_from_slice(&chunk);
1194                    Ok::<_, actix_web_v4::Error>(body)
1195                })
1196                .and_then(move |bytes| {
1197                    service.handle_bytes(bytes.freeze()).map(|res| match res {
1198                        ResponseObjects::Empty => Ok(actix_web_v4::dev::ServiceResponse::new(
1199                            req,
1200                            actix_web_v4::HttpResponse::NoContent().finish(),
1201                        )),
1202                        json => Ok(actix_web_v4::dev::ServiceResponse::new(
1203                            req,
1204                            actix_web_v4::HttpResponse::Ok().json(json),
1205                        )),
1206                    })
1207                });
1208            rt
1209        };
1210
1211        actix_service_v2::fn_service::<_, _, _, _, _, _>(inner)
1212    }
1213
1214    #[cfg(feature = "hyper-integration")]
1215    /// Converts the server into an `actix-web` compatible `NewService`
1216    pub fn into_hyper_web_service(self: Arc<Self>) -> Hyper<R> {
1217        Hyper(self)
1218    }
1219
1220    #[cfg(all(
1221        feature = "actix-web-v1-integration",
1222        not(feature = "hyper-integration"),
1223        not(feature = "actix-web-v2-integration"),
1224        not(feature = "actix-web-v3-integration"),
1225        not(feature = "actix-web-v4-integration")
1226    ))]
1227    /// Is an alias to `into_actix_web_service` or `into_hyper_web_service` depending on which feature is enabled
1228    ///
1229    /// Is not provided when both features are enabled
1230    pub fn into_web_service(
1231        self: Arc<Self>,
1232    ) -> impl actix_service_v04::NewService<
1233        Request = actix_web_v1::dev::ServiceRequest,
1234        Response = actix_web_v1::dev::ServiceResponse,
1235        Error = actix_web_v1::Error,
1236        Config = (),
1237        InitError = (),
1238    > {
1239        self.into_actix_web_service()
1240    }
1241
1242    #[cfg(all(
1243        feature = "actix-web-v2-integration",
1244        not(feature = "hyper-integration"),
1245        not(feature = "actix-web-v1-integration"),
1246        not(feature = "actix-web-v3-integration"),
1247        not(feature = "actix-web-v4-integration")
1248    ))]
1249    /// Is an alias to `into_actix_web_service` or `into_hyper_web_service` depending on which feature is enabled
1250    ///
1251    /// Is not provided when both features are enabled
1252    pub fn into_web_service(
1253        self: Arc<Self>,
1254    ) -> impl actix_service_v1::ServiceFactory<
1255        Request = actix_web_v2::dev::ServiceRequest,
1256        Response = actix_web_v2::dev::ServiceResponse,
1257        Error = actix_web_v2::Error,
1258        Config = (),
1259        InitError = (),
1260    > {
1261        self.into_actix_web_service()
1262    }
1263
1264    #[cfg(all(
1265        feature = "actix-web-v3-integration",
1266        not(feature = "hyper-integration"),
1267        not(feature = "actix-web-v1-integration"),
1268        not(feature = "actix-web-v2-integration"),
1269        not(feature = "actix-web-v4-integration")
1270    ))]
1271    /// Is an alias to `into_actix_web_service` or `into_hyper_web_service` depending on which feature is enabled
1272    ///
1273    /// Is not provided when both features are enabled
1274    pub fn into_web_service(
1275        self: Arc<Self>,
1276    ) -> impl actix_service_v1::ServiceFactory<
1277        Request = actix_web_v3::dev::ServiceRequest,
1278        Response = actix_web_v3::dev::ServiceResponse,
1279        Error = actix_web_v3::Error,
1280        Config = (),
1281        InitError = (),
1282    > {
1283        self.into_actix_web_service()
1284    }
1285
1286    #[cfg(all(
1287        feature = "actix-web-v4-integration",
1288        not(feature = "hyper-integration"),
1289        not(feature = "actix-web-v1-integration"),
1290        not(feature = "actix-web-v2-integration"),
1291        not(feature = "actix-web-v3-integration")
1292    ))]
1293    /// Is an alias to `into_actix_web_service` or `into_hyper_web_service` depending on which feature is enabled
1294    ///
1295    /// Is not provided when both features are enabled
1296    pub fn into_web_service(
1297        self: Arc<Self>,
1298    ) -> impl actix_service_v2::ServiceFactory<
1299        actix_web_v4::dev::ServiceRequest,
1300        Response = actix_web_v4::dev::ServiceResponse,
1301        Error = actix_web_v4::Error,
1302        Config = (),
1303        InitError = (),
1304    > {
1305        self.into_actix_web_service()
1306    }
1307
1308    /// Is an alias to `into_actix_web_service` or `into_hyper_web_service` depending on which feature is enabled
1309    ///
1310    /// Is not provided when both features are enabled
1311    #[cfg(all(
1312        feature = "hyper-integration",
1313        not(feature = "actix-web-v1-integration"),
1314        not(feature = "actix-web-v2-integration"),
1315        not(feature = "actix-web-v3-integration"),
1316        not(feature = "actix-web-v4-integration")
1317    ))]
1318    pub fn into_web_service(self: Arc<Self>) -> Hyper<R> {
1319        self.into_hyper_web_service()
1320    }
1321}
1322
1323#[cfg(feature = "hyper-integration")]
1324pub struct Hyper<R>(pub(crate) Arc<Server<R>>);
1325
1326#[cfg(feature = "hyper-integration")]
1327impl<R> hyper::service::Service<hyper::Request<hyper::Body>> for Hyper<R>
1328where
1329    R: Router + Send + Sync + 'static,
1330{
1331    type Response = hyper::Response<hyper::Body>;
1332    type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
1333    type Future =
1334        std::pin::Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1335
1336    fn poll_ready(
1337        &mut self,
1338        _cx: &mut std::task::Context<'_>,
1339    ) -> std::task::Poll<Result<(), Self::Error>> {
1340        std::task::Poll::Ready(Ok(()))
1341    }
1342
1343    fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
1344        use hyper::body::HttpBody;
1345
1346        let service = Arc::clone(&self.0);
1347
1348        let rt = async move {
1349            let mut buf = if let Some(content_length) = req
1350                .headers()
1351                .get(hyper::header::CONTENT_LENGTH)
1352                .and_then(|x| x.to_str().ok())
1353                .and_then(|x| x.parse().ok())
1354            {
1355                bytes_v10::BytesMut::with_capacity(content_length)
1356            } else {
1357                bytes_v10::BytesMut::default()
1358            };
1359
1360            let mut body = req.into_body();
1361
1362            while let Some(chunk) = body.data().await {
1363                buf.extend(chunk?);
1364            }
1365
1366            match service.handle_bytes(buf.freeze()).await {
1367                ResponseObjects::Empty => hyper::Response::builder()
1368                    .status(hyper::StatusCode::NO_CONTENT)
1369                    .body(hyper::Body::from(Vec::<u8>::new()))
1370                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
1371                json => serde_json::to_vec(&json)
1372                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1373                    .and_then(|json| {
1374                        hyper::Response::builder()
1375                            .status(hyper::StatusCode::OK)
1376                            .header("Content-Type", "application/json")
1377                            .body(hyper::Body::from(json))
1378                            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1379                    }),
1380            }
1381        };
1382        Box::pin(rt)
1383    }
1384}
1385
1386#[cfg(feature = "hyper-integration")]
1387impl<'a, R> tower_service::Service<&'a hyper::server::conn::AddrStream> for Hyper<R> {
1388    type Response = Hyper<R>;
1389    type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
1390    type Future = future::Ready<Result<Self::Response, Self::Error>>;
1391
1392    fn poll_ready(
1393        &mut self,
1394        _cx: &mut std::task::Context<'_>,
1395    ) -> std::task::Poll<Result<(), Self::Error>> {
1396        std::task::Poll::Ready(Ok(()))
1397    }
1398
1399    fn call(&mut self, _: &'a hyper::server::conn::AddrStream) -> Self::Future {
1400        future::ready(Ok(Hyper(Arc::clone(&self.0))))
1401    }
1402}