micro_tower/api/
service.rs

1use std::marker::PhantomData;
2use std::task::{Context, Poll};
3
4use bytes::{Buf, BufMut, BytesMut};
5
6use super::codec::{Decode, Encode};
7use super::{Error, Message};
8use crate::util::{BoxError, BoxFuture};
9
10/// API service which translates bytes to requests of type `T` and response to bytes.
11pub struct Service<R, C, S> {
12    inner: S,
13    _p: PhantomData<(C, R)>,
14}
15
16impl<R, C, S> Service<R, C, S> {
17    /// Creates new api layer by wrapping inner service
18    ///
19    /// # Parameters
20    /// - `inner` Service wrapped by API layer.
21    pub fn from_service(inner: S) -> Self {
22        Self {
23            inner,
24            _p: PhantomData,
25        }
26    }
27}
28
29impl<R, C, S> tower::Service<BytesMut> for Service<R, C, S>
30where
31    S: tower::Service<R, Error = BoxError>,
32    S::Future: Send + 'static,
33    C: Decode<R> + Encode<Message<S::Response>>,
34    <C as Encode<Message<S::Response>>>::Error: std::error::Error + Send + Sync + 'static,
35    <C as Decode<R>>::Error: Unpin + std::error::Error + Send + Sync + 'static,
36{
37    type Response = bytes::BytesMut;
38    type Error = Error;
39    type Future = BoxFuture<Result<Self::Response, Self::Error>>;
40
41    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42        match self.inner.poll_ready(cx) {
43            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
44            Poll::Ready(Err(_)) => todo!(),
45            Poll::Pending => Poll::Pending,
46        }
47    }
48
49    fn call(&mut self, buf: BytesMut) -> Self::Future {
50        let mut reader = buf.reader();
51        match C::decode(&mut reader) {
52            Ok(request) => {
53                let buf = reader.into_inner();
54                let fut = self.inner.call(request);
55                Box::pin(async move {
56                    match fut.await {
57                        Ok(response) => {
58                            let message = Message::Ok { data: response };
59                            let mut writer = buf.writer();
60                            if let Err(err) = C::encode(&mut writer, message) {
61                                let err = Error {
62                                    buf: writer.into_inner(),
63                                    err: Box::new(err),
64                                };
65                                return Err(err);
66                            }
67                            Ok(writer.into_inner())
68                        }
69                        Err(err) => {
70                            let message = Message::InternalServerError;
71                            let mut writer = buf.writer();
72                            C::encode(&mut writer, message).unwrap();
73                            let err = Error {
74                                buf: writer.into_inner(),
75                                err,
76                            };
77                            Err(err)
78                        }
79                    }
80                })
81            }
82            Err(err) => {
83                let message = Message::BadRequest;
84                let mut buf = reader.into_inner();
85                buf.clear();
86                let mut writer = buf.writer();
87                C::encode(&mut writer, message).unwrap();
88                let buf = writer.into_inner();
89                let err = Error {
90                    buf,
91                    err: Box::new(err),
92                };
93                Box::pin(async move { Err(err) })
94            }
95        }
96    }
97}
98
99impl<R, C, S: Clone> Clone for Service<R, C, S> {
100    fn clone(&self) -> Self {
101        Self {
102            inner: self.inner.clone(),
103            _p: PhantomData,
104        }
105    }
106}