micro_tower/api/
service.rs1use std::marker::PhantomData;
2use std::task::{Context, Poll};
3
4use bytes::{Buf, BufMut, BytesMut};
5
6use super::codec::{Decode, Encode};
7use super::{Error, Message};
8use crate::util::{BoxError, BoxFuture};
9
10pub struct Service<R, C, S> {
12 inner: S,
13 _p: PhantomData<(C, R)>,
14}
15
16impl<R, C, S> Service<R, C, S> {
17 pub fn from_service(inner: S) -> Self {
22 Self {
23 inner,
24 _p: PhantomData,
25 }
26 }
27}
28
29impl<R, C, S> tower::Service<BytesMut> for Service<R, C, S>
30where
31 S: tower::Service<R, Error = BoxError>,
32 S::Future: Send + 'static,
33 C: Decode<R> + Encode<Message<S::Response>>,
34 <C as Encode<Message<S::Response>>>::Error: std::error::Error + Send + Sync + 'static,
35 <C as Decode<R>>::Error: Unpin + std::error::Error + Send + Sync + 'static,
36{
37 type Response = bytes::BytesMut;
38 type Error = Error;
39 type Future = BoxFuture<Result<Self::Response, Self::Error>>;
40
41 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 match self.inner.poll_ready(cx) {
43 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
44 Poll::Ready(Err(_)) => todo!(),
45 Poll::Pending => Poll::Pending,
46 }
47 }
48
49 fn call(&mut self, buf: BytesMut) -> Self::Future {
50 let mut reader = buf.reader();
51 match C::decode(&mut reader) {
52 Ok(request) => {
53 let buf = reader.into_inner();
54 let fut = self.inner.call(request);
55 Box::pin(async move {
56 match fut.await {
57 Ok(response) => {
58 let message = Message::Ok { data: response };
59 let mut writer = buf.writer();
60 if let Err(err) = C::encode(&mut writer, message) {
61 let err = Error {
62 buf: writer.into_inner(),
63 err: Box::new(err),
64 };
65 return Err(err);
66 }
67 Ok(writer.into_inner())
68 }
69 Err(err) => {
70 let message = Message::InternalServerError;
71 let mut writer = buf.writer();
72 C::encode(&mut writer, message).unwrap();
73 let err = Error {
74 buf: writer.into_inner(),
75 err,
76 };
77 Err(err)
78 }
79 }
80 })
81 }
82 Err(err) => {
83 let message = Message::BadRequest;
84 let mut buf = reader.into_inner();
85 buf.clear();
86 let mut writer = buf.writer();
87 C::encode(&mut writer, message).unwrap();
88 let buf = writer.into_inner();
89 let err = Error {
90 buf,
91 err: Box::new(err),
92 };
93 Box::pin(async move { Err(err) })
94 }
95 }
96 }
97}
98
99impl<R, C, S: Clone> Clone for Service<R, C, S> {
100 fn clone(&self) -> Self {
101 Self {
102 inner: self.inner.clone(),
103 _p: PhantomData,
104 }
105 }
106}