1use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer};
64
65use serde_json::{value::RawValue, Value};
66
67use std::sync::Arc;
68
69use futures::{
70 future::{self, Future, FutureExt},
71 stream::StreamExt,
72};
73
74#[cfg(any(
75 feature = "actix-web-v1",
76 feature = "actix-web-v2",
77 feature = "actix-web-v3",
78 feature = "actix-web-v4"
79))]
80use futures::future::TryFutureExt;
81
82#[cfg(any(feature = "actix-web-v2", feature = "actix-web-v3", feature = "actix-web-v4"))]
83use futures::stream::TryStreamExt;
84
85use extensions::concurrent::Extensions;
86use std::{collections::HashMap, marker::PhantomData};
87
88#[cfg(feature = "bytes-v10")]
89use bytes_v10::Bytes;
90
91#[cfg(feature = "bytes-v05")]
92use bytes_v05::Bytes;
93
94#[cfg(feature = "bytes-v04")]
95use bytes_v04::Bytes;
96
97#[cfg(feature = "macros")]
98pub use jsonrpc_v2_macros::jsonrpc_v2_method;
99
100#[cfg(feature = "hyper-integration")]
101pub use factory::hyper::*;
102
103#[cfg(any(
104 feature = "actix-web-v1",
105 feature = "actix-web-v2",
106 feature = "actix-web-v3",
107 feature = "actix-web-v4"
108))]
109pub use factory::actix::*;
110
111mod factory;
112
113#[cfg(feature = "macros")]
114pub mod exp {
115 pub use serde;
116 pub use serde_json;
117}
118
119type BoxedSerialize = Box<dyn erased_serde::Serialize + Send>;
120
121#[derive(Serialize)]
123#[serde(untagged)]
124pub enum Error {
125 Full {
126 code: i64,
127 message: String,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 data: Option<BoxedSerialize>,
130 },
131 Provided {
132 code: i64,
133 message: &'static str,
134 },
135}
136
137impl Error {
138 pub const INVALID_REQUEST: Self = Error::Provided { code: -32600, message: "Invalid Request" };
139 pub const METHOD_NOT_FOUND: Self =
140 Error::Provided { code: -32601, message: "Method not found" };
141 pub const INVALID_PARAMS: Self = Error::Provided { code: -32602, message: "Invalid params" };
142 pub const INTERNAL_ERROR: Self = Error::Provided { code: -32603, message: "Internal Error" };
143 pub const PARSE_ERROR: Self = Error::Provided { code: -32700, message: "Parse error" };
144
145 pub fn internal<D: std::fmt::Display + Send>(e: D) -> Self {
146 Error::Full {
147 code: -32603,
148 message: "Internal Error".into(),
149 data: Some(Box::new(e.to_string())),
150 }
151 }
152
153 pub fn invalid_params(e: serde_json::Error) -> Self {
154 Error::Full {
155 code: -32602,
156 message: "Invalid params".into(),
157 data: Some(Box::new(format!("{e:#?}"))),
158 }
159 }
160}
161
162pub trait ErrorLike: std::fmt::Display {
164 fn code(&self) -> i64 {
166 0
167 }
168
169 fn message(&self) -> String {
171 self.to_string()
172 }
173
174 fn data(&self) -> Option<BoxedSerialize> {
176 None
177 }
178}
179
180impl<T> From<T> for Error
181where
182 T: ErrorLike,
183{
184 fn from(t: T) -> Error {
185 Error::Full { code: t.code(), message: t.message(), data: t.data() }
186 }
187}
188
189#[cfg(feature = "easy-errors")]
190impl<T> ErrorLike for T where T: std::fmt::Display {}
191
192#[doc(hidden)]
193#[derive(Default, Debug)]
194pub struct V2;
195
196impl Serialize for V2 {
197 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198 where
199 S: Serializer,
200 {
201 "2.0".serialize(serializer)
202 }
203}
204
205impl<'de> Deserialize<'de> for V2 {
206 fn deserialize<D>(deserializer: D) -> Result<V2, D::Error>
207 where
208 D: Deserializer<'de>,
209 {
210 let s: &str = Deserialize::deserialize(deserializer)?;
211 if s == "2.0" {
212 Ok(V2)
213 } else {
214 Err(serde::de::Error::custom("Could not deserialize V2"))
215 }
216 }
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
222#[serde(untagged)]
223pub enum Id {
224 Num(i64),
225 Str(Box<str>),
226 Null,
227}
228
229impl From<i64> for Id {
230 fn from(t: i64) -> Self {
231 Id::Num(t)
232 }
233}
234
235impl<'a> From<&'a str> for Id {
236 fn from(t: &'a str) -> Self {
237 Id::Str(t.into())
238 }
239}
240
241impl From<String> for Id {
242 fn from(t: String) -> Self {
243 Id::Str(t.into())
244 }
245}
246
247impl Default for Id {
248 fn default() -> Self {
249 Id::Null
250 }
251}
252
253#[derive(Debug)]
255pub struct Method(Box<str>);
256
257impl Method {
258 pub fn as_str(&self) -> &str {
259 &self.0
260 }
261}
262
263impl From<Method> for String {
264 fn from(t: Method) -> String {
265 String::from(t.0)
266 }
267}
268
269#[derive(Default)]
271pub struct RequestBuilder<M = ()> {
272 id: Id,
273 params: Option<Value>,
274 method: M,
275}
276
277impl<M> RequestBuilder<M> {
278 pub fn with_id<I: Into<Id>>(mut self, id: I) -> Self {
279 self.id = id.into();
280 self
281 }
282
283 pub fn with_params<I: Into<Value>>(mut self, params: I) -> Self {
284 self.params = Some(params.into());
285 self
286 }
287}
288
289impl RequestBuilder<()> {
290 pub fn with_method<I: Into<String>>(self, method: I) -> RequestBuilder<String> {
291 let RequestBuilder { id, params, .. } = self;
292 RequestBuilder { id, params, method: method.into() }
293 }
294}
295
296impl RequestBuilder<String> {
297 pub fn finish(self) -> RequestObject {
298 let RequestBuilder { id, params, method } = self;
299 RequestObject {
300 jsonrpc: V2,
301 method: method.into_boxed_str(),
302 params: params.map(InnerParams::Value),
303 id: Some(Some(id)),
304 }
305 }
306}
307
308#[derive(Default)]
310pub struct NotificationBuilder<M = ()> {
311 params: Option<Value>,
312 method: M,
313}
314
315impl<M> NotificationBuilder<M> {
316 pub fn with_params<I: Into<Value>>(mut self, params: I) -> Self {
317 self.params = Some(params.into());
318 self
319 }
320}
321
322impl NotificationBuilder<()> {
323 pub fn with_method<I: Into<String>>(self, method: I) -> NotificationBuilder<String> {
324 let NotificationBuilder { params, .. } = self;
325 NotificationBuilder { params, method: method.into() }
326 }
327}
328
329impl NotificationBuilder<String> {
330 pub fn finish(self) -> RequestObject {
331 let NotificationBuilder { method, params } = self;
332 RequestObject {
333 jsonrpc: V2,
334 method: method.into_boxed_str(),
335 params: params.map(InnerParams::Value),
336 id: None,
337 }
338 }
339}
340
341#[derive(Debug, Serialize, Deserialize)]
342#[serde(untagged)]
343enum InnerParams {
344 Value(Value),
345 Raw(Box<RawValue>),
346}
347
348#[derive(Debug, Deserialize, Serialize, Default)]
350#[serde(default)]
351pub struct RequestObject {
352 jsonrpc: V2,
353 method: Box<str>,
354 params: Option<InnerParams>,
355 #[serde(deserialize_with = "RequestObject::deserialize_id")]
356 #[serde(skip_serializing_if = "Option::is_none")]
357 id: Option<Option<Id>>,
358}
359
360impl RequestObject {
361 pub fn method_ref(&self) -> &str {
362 &self.method
363 }
364
365 pub fn id_ref(&self) -> Option<&Id> {
366 self.id.as_ref().and_then(|x| x.as_ref())
367 }
368}
369
370#[derive(Debug, Deserialize, Default)]
372#[serde(default)]
373struct BytesRequestObject {
374 jsonrpc: V2,
375 method: Box<str>,
376 params: Option<Box<RawValue>>,
377 #[serde(deserialize_with = "RequestObject::deserialize_id")]
378 #[serde(skip_serializing_if = "Option::is_none")]
379 id: Option<Option<Id>>,
380}
381
382impl From<BytesRequestObject> for RequestObject {
383 fn from(t: BytesRequestObject) -> Self {
384 let BytesRequestObject { jsonrpc, method, params, id } = t;
385 RequestObject { jsonrpc, method, params: params.map(InnerParams::Raw), id }
386 }
387}
388
389impl RequestObject {
390 pub fn request() -> RequestBuilder {
392 RequestBuilder::default()
393 }
394
395 pub fn notification() -> NotificationBuilder {
397 NotificationBuilder::default()
398 }
399
400 fn deserialize_id<'de, D>(deserializer: D) -> Result<Option<Option<Id>>, D::Error>
401 where
402 D: Deserializer<'de>,
403 {
404 Ok(Some(Option::deserialize(deserializer)?))
405 }
406}
407
408#[doc(hidden)]
409pub struct RequestObjectWithData {
410 inner: RequestObject,
411 data: Arc<Extensions>,
412}
413
414#[derive(Deserialize)]
423pub struct Params<T>(pub T);
424
425impl<T> Params<T>
426where
427 T: DeserializeOwned,
428{
429 fn from_request_object(req: &RequestObject) -> Result<Self, Error> {
430 let res = match req.params {
431 Some(InnerParams::Raw(ref value)) => serde_json::from_str(value.get()),
432 Some(InnerParams::Value(ref value)) => serde_json::from_value(value.clone()),
433 None => serde_json::from_value(Value::Null),
434 };
435 res.map(Params).map_err(Error::invalid_params)
436 }
437}
438
439#[async_trait::async_trait]
441pub trait FromRequest: Sized {
442 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error>;
443}
444
445#[async_trait::async_trait]
446impl FromRequest for () {
447 async fn from_request(_: &RequestObjectWithData) -> Result<Self, Error> {
448 Ok(())
449 }
450}
451
452pub struct Data<T>(pub Arc<T>);
454
455impl<T> Data<T> {
456 pub fn new(t: T) -> Self {
457 Data(Arc::new(t))
458 }
459}
460
461impl<T> std::ops::Deref for Data<T> {
462 type Target = T;
463
464 fn deref(&self) -> &Self::Target {
465 &*self.0
466 }
467}
468
469#[async_trait::async_trait]
470impl<T: Send + Sync + 'static> FromRequest for Data<T> {
471 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
472 let out = req.data.get::<Data<T>>().map(|x| Data(Arc::clone(&x.0))).ok_or_else(|| {
473 Error::internal(format!("Missing data for: `{}`", std::any::type_name::<T>()))
474 })?;
475 Ok(out)
476 }
477}
478
479#[async_trait::async_trait]
480impl<T: DeserializeOwned> FromRequest for Params<T> {
481 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
482 Ok(Self::from_request_object(&req.inner)?)
483 }
484}
485
486#[async_trait::async_trait]
487impl FromRequest for Id {
488 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
489 Ok(req
490 .inner
491 .id
492 .clone()
493 .and_then(|x| x)
494 .ok_or_else(|| Error::internal("No `id` provided"))?)
495 }
496}
497
498#[async_trait::async_trait]
499impl FromRequest for Method {
500 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
501 Ok(Method(req.inner.method.clone()))
502 }
503}
504
505#[async_trait::async_trait]
506impl<T: FromRequest> FromRequest for Option<T> {
507 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
508 Ok(T::from_request(req).await.ok())
509 }
510}
511
512#[async_trait::async_trait]
513impl<T1> FromRequest for (T1,)
514where
515 T1: FromRequest + Send,
516{
517 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
518 Ok((T1::from_request(req).await?,))
519 }
520}
521
522#[async_trait::async_trait]
523impl<T1, T2> FromRequest for (T1, T2)
524where
525 T1: FromRequest + Send,
526 T2: FromRequest + Send,
527{
528 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
529 let (t1, t2) = futures::join!(T1::from_request(req), T2::from_request(req));
530 Ok((t1?, t2?))
531 }
532}
533
534#[async_trait::async_trait]
535impl<T1, T2, T3> FromRequest for (T1, T2, T3)
536where
537 T1: FromRequest + Send,
538 T2: FromRequest + Send,
539 T3: FromRequest + Send,
540{
541 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
542 let (t1, t2, t3) =
543 futures::join!(T1::from_request(req), T2::from_request(req), T3::from_request(req));
544 Ok((t1?, t2?, t3?))
545 }
546}
547
548#[async_trait::async_trait]
549impl<T1, T2, T3, T4> FromRequest for (T1, T2, T3, T4)
550where
551 T1: FromRequest + Send,
552 T2: FromRequest + Send,
553 T3: FromRequest + Send,
554 T4: FromRequest + Send,
555{
556 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
557 let (t1, t2, t3, t4) = futures::join!(
558 T1::from_request(req),
559 T2::from_request(req),
560 T3::from_request(req),
561 T4::from_request(req)
562 );
563 Ok((t1?, t2?, t3?, t4?))
564 }
565}
566
567#[async_trait::async_trait]
568impl<T1, T2, T3, T4, T5> FromRequest for (T1, T2, T3, T4, T5)
569where
570 T1: FromRequest + Send,
571 T2: FromRequest + Send,
572 T3: FromRequest + Send,
573 T4: FromRequest + Send,
574 T5: FromRequest + Send,
575{
576 async fn from_request(req: &RequestObjectWithData) -> Result<Self, Error> {
577 let (t1, t2, t3, t4, t5) = futures::join!(
578 T1::from_request(req),
579 T2::from_request(req),
580 T3::from_request(req),
581 T4::from_request(req),
582 T5::from_request(req)
583 );
584 Ok((t1?, t2?, t3?, t4?, t5?))
585 }
586}
587
588#[doc(hidden)]
589struct Handler<F, S, E, T>
590where
591 F: Factory<S, E, T>,
592{
593 hnd: F,
594 _t: PhantomData<fn() -> (S, E, T)>,
595}
596
597impl<F, S, E, T> Handler<F, S, E, T>
598where
599 F: Factory<S, E, T>,
600{
601 fn new(hnd: F) -> Self {
602 Handler { hnd, _t: PhantomData }
603 }
604}
605
606#[cfg(any(
607 feature = "actix-web-v1",
608 feature = "actix-web-v2",
609 feature = "actix-web-v3",
610 feature = "actix-web-v4"
611))]
612impl<F, S, E, T> From<Handler<F, S, E, T>> for BoxedHandler
613where
614 F: Factory<S, E, T> + 'static + Send + Sync,
615 S: Serialize + Send + 'static,
616 Error: From<E>,
617 E: 'static,
618 T: FromRequest + 'static + Send,
619{
620 fn from(t: Handler<F, S, E, T>) -> BoxedHandler {
621 let hnd = Arc::new(t.hnd);
622
623 let inner = move |req: RequestObjectWithData| {
624 let hnd = Arc::clone(&hnd);
625 Box::pin(async move {
626 let out = {
627 let param = T::from_request(&req).await?;
628 hnd.call(param).await?
629 };
630 Ok(Box::new(out) as BoxedSerialize)
631 }) as std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>>>>
632 };
633
634 BoxedHandler(Box::new(inner))
635 }
636}
637
638#[cfg(feature = "hyper-integration")]
639impl<F, S, E, T> From<Handler<F, S, E, T>> for BoxedHandler
640where
641 F: Factory<S, E, T> + 'static + Send + Sync,
642 S: Serialize + Send + 'static,
643 Error: From<E>,
644 E: 'static,
645 T: FromRequest + 'static + Send,
646{
647 fn from(t: Handler<F, S, E, T>) -> BoxedHandler {
648 let hnd = Arc::new(t.hnd);
649
650 let inner = move |req: RequestObjectWithData| {
651 let hnd = Arc::clone(&hnd);
652 Box::pin(async move {
653 let out = {
654 let param = T::from_request(&req).await?;
655 hnd.call(param).await?
656 };
657 Ok(Box::new(out) as BoxedSerialize)
658 })
659 as std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>> + Send>>
660 };
661
662 BoxedHandler(Box::new(inner))
663 }
664}
665
666#[cfg(any(
667 feature = "actix-web-v1",
668 feature = "actix-web-v2",
669 feature = "actix-web-v3",
670 feature = "actix-web-v4"
671))]
672pub struct BoxedHandler(
673 Box<
674 dyn Fn(
675 RequestObjectWithData,
676 )
677 -> std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>>>>
678 + Send
679 + Sync,
680 >,
681);
682
683#[cfg(feature = "hyper-integration")]
684pub struct BoxedHandler(
685 Box<
686 dyn Fn(
687 RequestObjectWithData,
688 )
689 -> std::pin::Pin<Box<dyn Future<Output = Result<BoxedSerialize, Error>> + Send>>
690 + Send
691 + Sync,
692 >,
693);
694
695pub struct MapRouter(HashMap<String, BoxedHandler>);
696
697impl Default for MapRouter {
698 fn default() -> Self {
699 MapRouter(HashMap::default())
700 }
701}
702
703pub trait Router: Default {
704 fn get(&self, name: &str) -> Option<&BoxedHandler>;
705 fn insert(&mut self, name: String, handler: BoxedHandler) -> Option<BoxedHandler>;
706}
707
708impl Router for MapRouter {
709 fn get(&self, name: &str) -> Option<&BoxedHandler> {
710 self.0.get(name)
711 }
712 fn insert(&mut self, name: String, handler: BoxedHandler) -> Option<BoxedHandler> {
713 self.0.insert(name, handler)
714 }
715}
716
717pub struct Server<R> {
719 data: Arc<Extensions>,
720 router: R,
721}
722
723pub struct ServerBuilder<R> {
727 data: Extensions,
728 router: R,
729}
730
731impl Server<MapRouter> {
732 pub fn new() -> ServerBuilder<MapRouter> {
733 Server::with_router(MapRouter::default())
734 }
735}
736
737impl<R: Router> Server<R> {
738 pub fn with_router(router: R) -> ServerBuilder<R> {
739 ServerBuilder { data: Extensions::new(), router }
740 }
741}
742
743impl<R: Router> ServerBuilder<R> {
744 pub fn with_data<T: Send + Sync + 'static>(mut self, data: Data<T>) -> Self {
746 self.data.insert(data);
747 self
748 }
749
750 pub fn with_method<N, S, E, F, T>(mut self, name: N, handler: F) -> Self
759 where
760 N: Into<String>,
761 F: Factory<S, E, T> + Send + Sync + 'static,
762 S: Serialize + Send + 'static,
763 Error: From<E>,
764 E: 'static,
765 T: FromRequest + Send + 'static,
766 {
767 self.router.insert(name.into(), Handler::new(handler).into());
768 self
769 }
770
771 pub fn finish(self) -> Arc<Server<R>> {
773 let ServerBuilder { router, data } = self;
774 Arc::new(Server { router, data: Arc::new(data) })
775 }
776
777 pub fn finish_unwrapped(self) -> Server<R> {
779 let ServerBuilder { router, data } = self;
780 Server { router, data: Arc::new(data) }
781 }
782}
783
784#[derive(Serialize)]
786#[serde(untagged)]
787pub enum ResponseObject {
788 Result { jsonrpc: V2, result: BoxedSerialize, id: Id },
789 Error { jsonrpc: V2, error: Error, id: Id },
790}
791
792impl ResponseObject {
793 fn result(result: BoxedSerialize, id: Id) -> Self {
794 ResponseObject::Result { jsonrpc: V2, result, id }
795 }
796
797 fn error(error: Error, id: Id) -> Self {
798 ResponseObject::Error { jsonrpc: V2, error, id }
799 }
800}
801
802#[derive(Serialize)]
804#[serde(untagged)]
805pub enum ResponseObjects {
806 One(ResponseObject),
807 Many(Vec<ResponseObject>),
808 Empty,
809}
810
811#[derive(Serialize)]
812#[serde(untagged)]
813enum ManyResponseObjects {
814 Many(Vec<ResponseObject>),
815 Empty,
816}
817
818#[derive(Serialize)]
819#[serde(untagged)]
820enum SingleResponseObject {
821 One(ResponseObject),
822 Empty,
823}
824
825impl From<ManyResponseObjects> for ResponseObjects {
826 fn from(t: ManyResponseObjects) -> Self {
827 match t {
828 ManyResponseObjects::Many(many) => ResponseObjects::Many(many),
829 ManyResponseObjects::Empty => ResponseObjects::Empty,
830 }
831 }
832}
833
834impl From<SingleResponseObject> for ResponseObjects {
835 fn from(t: SingleResponseObject) -> Self {
836 match t {
837 SingleResponseObject::One(one) => ResponseObjects::One(one),
838 SingleResponseObject::Empty => ResponseObjects::Empty,
839 }
840 }
841}
842
843impl SingleResponseObject {
844 fn result(result: BoxedSerialize, opt_id: Option<Id>) -> Self {
845 opt_id
846 .map(|id| SingleResponseObject::One(ResponseObject::result(result, id)))
847 .unwrap_or_else(|| SingleResponseObject::Empty)
848 }
849
850 fn error(error: Error, opt_id: Option<Id>) -> Self {
851 opt_id
852 .map(|id| SingleResponseObject::One(ResponseObject::error(error, id)))
853 .unwrap_or_else(|| SingleResponseObject::Empty)
854 }
855}
856
857pub enum RequestKind {
862 RequestObject(RequestObject),
863 ManyRequestObjects(Vec<RequestObject>),
864 Bytes(Bytes),
865}
866
867impl From<RequestObject> for RequestKind {
868 fn from(t: RequestObject) -> Self {
869 RequestKind::RequestObject(t)
870 }
871}
872
873impl From<Vec<RequestObject>> for RequestKind {
874 fn from(t: Vec<RequestObject>) -> Self {
875 RequestKind::ManyRequestObjects(t)
876 }
877}
878
879impl From<Bytes> for RequestKind {
880 fn from(t: Bytes) -> Self {
881 RequestKind::Bytes(t)
882 }
883}
884
885impl<'a> From<&'a [u8]> for RequestKind {
886 fn from(t: &'a [u8]) -> Self {
887 Bytes::from(t.to_vec()).into()
888 }
889}
890
891#[derive(Debug)]
892enum OneOrManyRawValues<'a> {
893 Many(Vec<&'a RawValue>),
894 One(&'a RawValue),
895}
896
897impl<'a> OneOrManyRawValues<'a> {
898 pub fn try_from_slice(slice: &'a [u8]) -> Result<Self, serde_json::Error> {
899 if slice.first() == Some(&b'[') {
900 Ok(OneOrManyRawValues::Many(serde_json::from_slice::<Vec<&RawValue>>(slice)?))
901 } else {
902 Ok(OneOrManyRawValues::One(serde_json::from_slice::<&RawValue>(slice)?))
903 }
904 }
905}
906
907impl<R> Server<R>
908where
909 R: Router + 'static,
910{
911 #[cfg(feature = "actix-web-v1-integration")]
912 fn handle_bytes_compat(
913 &self,
914 bytes: Bytes,
915 ) -> impl futures_v01::Future<Item = ResponseObjects, Error = ()> {
916 self.handle_bytes(bytes).unit_error().boxed().compat()
917 }
918
919 pub fn handle<I: Into<RequestKind>>(&self, req: I) -> impl Future<Output = ResponseObjects> {
921 match req.into() {
922 RequestKind::Bytes(bytes) => future::Either::Left(self.handle_bytes(bytes)),
923 RequestKind::RequestObject(req) => future::Either::Right(future::Either::Left(
924 self.handle_request_object(req).map(From::from),
925 )),
926 RequestKind::ManyRequestObjects(reqs) => future::Either::Right(future::Either::Right(
927 self.handle_many_request_objects(reqs).map(From::from),
928 )),
929 }
930 }
931
932 fn handle_request_object(
933 &self,
934 req: RequestObject,
935 ) -> impl Future<Output = SingleResponseObject> {
936 let req = RequestObjectWithData { inner: req, data: Arc::clone(&self.data) };
937
938 let opt_id = match req.inner.id {
939 Some(Some(ref id)) => Some(id.clone()),
940 Some(None) => Some(Id::Null),
941 None => None,
942 };
943
944 if let Some(method) = self.router.get(req.inner.method.as_ref()) {
945 let out = (&method.0)(req).then(|res| match res {
946 Ok(val) => future::ready(SingleResponseObject::result(val, opt_id)),
947 Err(e) => future::ready(SingleResponseObject::error(e, opt_id)),
948 });
949 future::Either::Left(out)
950 } else {
951 future::Either::Right(future::ready(SingleResponseObject::error(
952 Error::METHOD_NOT_FOUND,
953 opt_id,
954 )))
955 }
956 }
957
958 fn handle_many_request_objects<I: IntoIterator<Item = RequestObject>>(
959 &self,
960 reqs: I,
961 ) -> impl Future<Output = ManyResponseObjects> {
962 reqs.into_iter()
963 .map(|r| self.handle_request_object(r))
964 .collect::<futures::stream::FuturesUnordered<_>>()
965 .filter_map(|res| async move {
966 match res {
967 SingleResponseObject::One(r) => Some(r),
968 _ => None,
969 }
970 })
971 .collect::<Vec<_>>()
972 .map(|vec| {
973 if vec.is_empty() {
974 ManyResponseObjects::Empty
975 } else {
976 ManyResponseObjects::Many(vec)
977 }
978 })
979 }
980
981 fn handle_bytes(&self, bytes: Bytes) -> impl Future<Output = ResponseObjects> {
982 if let Ok(raw_values) = OneOrManyRawValues::try_from_slice(bytes.as_ref()) {
983 match raw_values {
984 OneOrManyRawValues::Many(raw_reqs) => {
985 if raw_reqs.is_empty() {
986 return future::Either::Left(future::ready(ResponseObjects::One(
987 ResponseObject::error(Error::INVALID_REQUEST, Id::Null),
988 )));
989 }
990
991 let (okays, errs) = raw_reqs
992 .into_iter()
993 .map(|x| {
994 serde_json::from_str::<BytesRequestObject>(x.get())
995 .map(RequestObject::from)
996 })
997 .partition::<Vec<_>, _>(|x| x.is_ok());
998
999 let errs = errs
1000 .into_iter()
1001 .map(|_| ResponseObject::error(Error::INVALID_REQUEST, Id::Null))
1002 .collect::<Vec<_>>();
1003
1004 future::Either::Right(future::Either::Left(
1005 self.handle_many_request_objects(okays.into_iter().flat_map(|x| x)).map(
1006 |res| match res {
1007 ManyResponseObjects::Many(mut many) => {
1008 many.extend(errs);
1009 ResponseObjects::Many(many)
1010 }
1011 ManyResponseObjects::Empty => {
1012 if errs.is_empty() {
1013 ResponseObjects::Empty
1014 } else {
1015 ResponseObjects::Many(errs)
1016 }
1017 }
1018 },
1019 ),
1020 ))
1021 }
1022 OneOrManyRawValues::One(raw_req) => {
1023 match serde_json::from_str::<BytesRequestObject>(raw_req.get())
1024 .map(RequestObject::from)
1025 {
1026 Ok(rn) => future::Either::Right(future::Either::Right(
1027 self.handle_request_object(rn).map(|res| match res {
1028 SingleResponseObject::One(r) => ResponseObjects::One(r),
1029 _ => ResponseObjects::Empty,
1030 }),
1031 )),
1032 Err(_) => future::Either::Left(future::ready(ResponseObjects::One(
1033 ResponseObject::error(Error::INVALID_REQUEST, Id::Null),
1034 ))),
1035 }
1036 }
1037 }
1038 } else {
1039 future::Either::Left(future::ready(ResponseObjects::One(ResponseObject::error(
1040 Error::PARSE_ERROR,
1041 Id::Null,
1042 ))))
1043 }
1044 }
1045
1046 #[cfg(feature = "actix-web-v1-integration")]
1047 pub fn into_actix_web_service(
1049 self: Arc<Self>,
1050 ) -> impl actix_service_v04::NewService<
1051 Request = actix_web_v1::dev::ServiceRequest,
1052 Response = actix_web_v1::dev::ServiceResponse,
1053 Error = actix_web_v1::Error,
1054 Config = (),
1055 InitError = (),
1056 > {
1057 use futures_v01::{Future, Stream};
1058
1059 let service = Arc::clone(&self);
1060
1061 let inner = move |req: actix_web_v1::dev::ServiceRequest| {
1062 let service = Arc::clone(&service);
1063 let (req, payload) = req.into_parts();
1064 let rt = payload
1065 .map_err(actix_web_v1::Error::from)
1066 .fold(actix_web_v1::web::BytesMut::new(), move |mut body, chunk| {
1067 body.extend_from_slice(&chunk);
1068 Ok::<_, actix_web_v1::Error>(body)
1069 })
1070 .and_then(move |bytes| {
1071 service.handle_bytes_compat(bytes.freeze()).then(|res| match res {
1072 Ok(res_inner) => match res_inner {
1073 ResponseObjects::Empty => Ok(actix_web_v1::dev::ServiceResponse::new(
1074 req,
1075 actix_web_v1::HttpResponse::NoContent().finish(),
1076 )),
1077 json => Ok(actix_web_v1::dev::ServiceResponse::new(
1078 req,
1079 actix_web_v1::HttpResponse::Ok().json(json),
1080 )),
1081 },
1082 Err(_) => Ok(actix_web_v1::dev::ServiceResponse::new(
1083 req,
1084 actix_web_v1::HttpResponse::InternalServerError().into(),
1085 )),
1086 })
1087 });
1088 rt
1089 };
1090
1091 actix_service_v04::service_fn::<_, _, _, ()>(inner)
1092 }
1093
1094 #[cfg(feature = "actix-web-v2-integration")]
1095 pub fn into_actix_web_service(
1097 self: Arc<Self>,
1098 ) -> impl actix_service_v1::ServiceFactory<
1099 Request = actix_web_v2::dev::ServiceRequest,
1100 Response = actix_web_v2::dev::ServiceResponse,
1101 Error = actix_web_v2::Error,
1102 Config = (),
1103 InitError = (),
1104 > {
1105 let service = Arc::clone(&self);
1106
1107 let inner = move |req: actix_web_v2::dev::ServiceRequest| {
1108 let service = Arc::clone(&service);
1109 let (req, payload) = req.into_parts();
1110 let rt = payload
1111 .map_err(actix_web_v2::Error::from)
1112 .try_fold(actix_web_v2::web::BytesMut::new(), move |mut body, chunk| async move {
1113 body.extend_from_slice(&chunk);
1114 Ok::<_, actix_web_v2::Error>(body)
1115 })
1116 .and_then(move |bytes| {
1117 service.handle_bytes(bytes.freeze()).map(|res| match res {
1118 ResponseObjects::Empty => Ok(actix_web_v2::dev::ServiceResponse::new(
1119 req,
1120 actix_web_v2::HttpResponse::NoContent().finish(),
1121 )),
1122 json => Ok(actix_web_v2::dev::ServiceResponse::new(
1123 req,
1124 actix_web_v2::HttpResponse::Ok().json(json),
1125 )),
1126 })
1127 });
1128 rt
1129 };
1130
1131 actix_service_v1::fn_service::<_, _, _, _, _, _>(inner)
1132 }
1133
1134 #[cfg(feature = "actix-web-v3-integration")]
1135 pub fn into_actix_web_service(
1137 self: Arc<Self>,
1138 ) -> impl actix_service_v1::ServiceFactory<
1139 Request = actix_web_v3::dev::ServiceRequest,
1140 Response = actix_web_v3::dev::ServiceResponse,
1141 Error = actix_web_v3::Error,
1142 Config = (),
1143 InitError = (),
1144 > {
1145 let service = Arc::clone(&self);
1146
1147 let inner = move |req: actix_web_v3::dev::ServiceRequest| {
1148 let service = Arc::clone(&service);
1149 let (req, payload) = req.into_parts();
1150 let rt = payload
1151 .map_err(actix_web_v3::Error::from)
1152 .try_fold(actix_web_v3::web::BytesMut::new(), move |mut body, chunk| async move {
1153 body.extend_from_slice(&chunk);
1154 Ok::<_, actix_web_v3::Error>(body)
1155 })
1156 .and_then(move |bytes| {
1157 service.handle_bytes(bytes.freeze()).map(|res| match res {
1158 ResponseObjects::Empty => Ok(actix_web_v3::dev::ServiceResponse::new(
1159 req,
1160 actix_web_v3::HttpResponse::NoContent().finish(),
1161 )),
1162 json => Ok(actix_web_v3::dev::ServiceResponse::new(
1163 req,
1164 actix_web_v3::HttpResponse::Ok().json(json),
1165 )),
1166 })
1167 });
1168 rt
1169 };
1170
1171 actix_service_v1::fn_service::<_, _, _, _, _, _>(inner)
1172 }
1173
1174 #[cfg(feature = "actix-web-v4-integration")]
1175 pub fn into_actix_web_service(
1177 self: Arc<Self>,
1178 ) -> impl actix_service_v2::ServiceFactory<
1179 actix_web_v4::dev::ServiceRequest,
1180 Response = actix_web_v4::dev::ServiceResponse,
1181 Error = actix_web_v4::Error,
1182 Config = (),
1183 InitError = (),
1184 > {
1185 let service = Arc::clone(&self);
1186
1187 let inner = move |req: actix_web_v4::dev::ServiceRequest| {
1188 let service = Arc::clone(&service);
1189 let (req, payload) = req.into_parts();
1190 let rt = payload
1191 .map_err(actix_web_v4::Error::from)
1192 .try_fold(actix_web_v4::web::BytesMut::new(), move |mut body, chunk| async move {
1193 body.extend_from_slice(&chunk);
1194 Ok::<_, actix_web_v4::Error>(body)
1195 })
1196 .and_then(move |bytes| {
1197 service.handle_bytes(bytes.freeze()).map(|res| match res {
1198 ResponseObjects::Empty => Ok(actix_web_v4::dev::ServiceResponse::new(
1199 req,
1200 actix_web_v4::HttpResponse::NoContent().finish(),
1201 )),
1202 json => Ok(actix_web_v4::dev::ServiceResponse::new(
1203 req,
1204 actix_web_v4::HttpResponse::Ok().json(json),
1205 )),
1206 })
1207 });
1208 rt
1209 };
1210
1211 actix_service_v2::fn_service::<_, _, _, _, _, _>(inner)
1212 }
1213
1214 #[cfg(feature = "hyper-integration")]
1215 pub fn into_hyper_web_service(self: Arc<Self>) -> Hyper<R> {
1217 Hyper(self)
1218 }
1219
1220 #[cfg(all(
1221 feature = "actix-web-v1-integration",
1222 not(feature = "hyper-integration"),
1223 not(feature = "actix-web-v2-integration"),
1224 not(feature = "actix-web-v3-integration"),
1225 not(feature = "actix-web-v4-integration")
1226 ))]
1227 pub fn into_web_service(
1231 self: Arc<Self>,
1232 ) -> impl actix_service_v04::NewService<
1233 Request = actix_web_v1::dev::ServiceRequest,
1234 Response = actix_web_v1::dev::ServiceResponse,
1235 Error = actix_web_v1::Error,
1236 Config = (),
1237 InitError = (),
1238 > {
1239 self.into_actix_web_service()
1240 }
1241
1242 #[cfg(all(
1243 feature = "actix-web-v2-integration",
1244 not(feature = "hyper-integration"),
1245 not(feature = "actix-web-v1-integration"),
1246 not(feature = "actix-web-v3-integration"),
1247 not(feature = "actix-web-v4-integration")
1248 ))]
1249 pub fn into_web_service(
1253 self: Arc<Self>,
1254 ) -> impl actix_service_v1::ServiceFactory<
1255 Request = actix_web_v2::dev::ServiceRequest,
1256 Response = actix_web_v2::dev::ServiceResponse,
1257 Error = actix_web_v2::Error,
1258 Config = (),
1259 InitError = (),
1260 > {
1261 self.into_actix_web_service()
1262 }
1263
1264 #[cfg(all(
1265 feature = "actix-web-v3-integration",
1266 not(feature = "hyper-integration"),
1267 not(feature = "actix-web-v1-integration"),
1268 not(feature = "actix-web-v2-integration"),
1269 not(feature = "actix-web-v4-integration")
1270 ))]
1271 pub fn into_web_service(
1275 self: Arc<Self>,
1276 ) -> impl actix_service_v1::ServiceFactory<
1277 Request = actix_web_v3::dev::ServiceRequest,
1278 Response = actix_web_v3::dev::ServiceResponse,
1279 Error = actix_web_v3::Error,
1280 Config = (),
1281 InitError = (),
1282 > {
1283 self.into_actix_web_service()
1284 }
1285
1286 #[cfg(all(
1287 feature = "actix-web-v4-integration",
1288 not(feature = "hyper-integration"),
1289 not(feature = "actix-web-v1-integration"),
1290 not(feature = "actix-web-v2-integration"),
1291 not(feature = "actix-web-v3-integration")
1292 ))]
1293 pub fn into_web_service(
1297 self: Arc<Self>,
1298 ) -> impl actix_service_v2::ServiceFactory<
1299 actix_web_v4::dev::ServiceRequest,
1300 Response = actix_web_v4::dev::ServiceResponse,
1301 Error = actix_web_v4::Error,
1302 Config = (),
1303 InitError = (),
1304 > {
1305 self.into_actix_web_service()
1306 }
1307
1308 #[cfg(all(
1312 feature = "hyper-integration",
1313 not(feature = "actix-web-v1-integration"),
1314 not(feature = "actix-web-v2-integration"),
1315 not(feature = "actix-web-v3-integration"),
1316 not(feature = "actix-web-v4-integration")
1317 ))]
1318 pub fn into_web_service(self: Arc<Self>) -> Hyper<R> {
1319 self.into_hyper_web_service()
1320 }
1321}
1322
1323#[cfg(feature = "hyper-integration")]
1324pub struct Hyper<R>(pub(crate) Arc<Server<R>>);
1325
1326#[cfg(feature = "hyper-integration")]
1327impl<R> hyper::service::Service<hyper::Request<hyper::Body>> for Hyper<R>
1328where
1329 R: Router + Send + Sync + 'static,
1330{
1331 type Response = hyper::Response<hyper::Body>;
1332 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
1333 type Future =
1334 std::pin::Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1335
1336 fn poll_ready(
1337 &mut self,
1338 _cx: &mut std::task::Context<'_>,
1339 ) -> std::task::Poll<Result<(), Self::Error>> {
1340 std::task::Poll::Ready(Ok(()))
1341 }
1342
1343 fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
1344 use hyper::body::HttpBody;
1345
1346 let service = Arc::clone(&self.0);
1347
1348 let rt = async move {
1349 let mut buf = if let Some(content_length) = req
1350 .headers()
1351 .get(hyper::header::CONTENT_LENGTH)
1352 .and_then(|x| x.to_str().ok())
1353 .and_then(|x| x.parse().ok())
1354 {
1355 bytes_v10::BytesMut::with_capacity(content_length)
1356 } else {
1357 bytes_v10::BytesMut::default()
1358 };
1359
1360 let mut body = req.into_body();
1361
1362 while let Some(chunk) = body.data().await {
1363 buf.extend(chunk?);
1364 }
1365
1366 match service.handle_bytes(buf.freeze()).await {
1367 ResponseObjects::Empty => hyper::Response::builder()
1368 .status(hyper::StatusCode::NO_CONTENT)
1369 .body(hyper::Body::from(Vec::<u8>::new()))
1370 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
1371 json => serde_json::to_vec(&json)
1372 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1373 .and_then(|json| {
1374 hyper::Response::builder()
1375 .status(hyper::StatusCode::OK)
1376 .header("Content-Type", "application/json")
1377 .body(hyper::Body::from(json))
1378 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
1379 }),
1380 }
1381 };
1382 Box::pin(rt)
1383 }
1384}
1385
1386#[cfg(feature = "hyper-integration")]
1387impl<'a, R> tower_service::Service<&'a hyper::server::conn::AddrStream> for Hyper<R> {
1388 type Response = Hyper<R>;
1389 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
1390 type Future = future::Ready<Result<Self::Response, Self::Error>>;
1391
1392 fn poll_ready(
1393 &mut self,
1394 _cx: &mut std::task::Context<'_>,
1395 ) -> std::task::Poll<Result<(), Self::Error>> {
1396 std::task::Poll::Ready(Ok(()))
1397 }
1398
1399 fn call(&mut self, _: &'a hyper::server::conn::AddrStream) -> Self::Future {
1400 future::ready(Ok(Hyper(Arc::clone(&self.0))))
1401 }
1402}