proto_tower_http_1/client/
layer.rs1use crate::client::parser::parse_http1_response;
2use crate::client::ProtoHttp1ClientConfig;
3use crate::data::request::HTTP1Request;
4use crate::data::HTTP1ClientResponse;
5use proto_tower_util::{AsyncReadToBuf, WriteTo, ZeroReadBehaviour};
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::io::{ReadHalf, SimplexStream, WriteHalf};
10use tower::Service;
11
12pub struct ProtoHttp1ClientLayer<Svc>
16where
17 Svc: Service<(ReadHalf<SimplexStream>, WriteHalf<SimplexStream>), Response = ()> + Send + Clone,
18{
19 config: ProtoHttp1ClientConfig,
20 inner: Svc,
22}
23
24impl<Svc> ProtoHttp1ClientLayer<Svc>
25where
26 Svc: Service<(ReadHalf<SimplexStream>, WriteHalf<SimplexStream>), Response = ()> + Send + Clone,
27{
28 pub fn new(config: ProtoHttp1ClientConfig, inner: Svc) -> Self {
30 ProtoHttp1ClientLayer { config, inner }
31 }
32}
33
34#[derive(Debug)]
35pub enum ProtoHttp1LayerError<SvcError> {
36 #[allow(dead_code)]
38 Implementation(String),
39 InternalServiceWrongResponse,
41 InternalServiceError(SvcError),
43}
44
45impl<Svc, SvcError, SvcFut> Service<HTTP1Request> for ProtoHttp1ClientLayer<Svc>
46where
47 Svc: Service<(ReadHalf<SimplexStream>, WriteHalf<SimplexStream>), Response = (), Error = SvcError, Future = SvcFut> + Send + Clone + 'static,
48 SvcFut: Future<Output = Result<(), SvcError>> + Send,
49{
50 type Response = HTTP1ClientResponse<ReadHalf<SimplexStream>, WriteHalf<SimplexStream>>;
52 type Error = ProtoHttp1LayerError<SvcError>;
54 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
56
57 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58 self.inner.poll_ready(cx).map_err(|e| ProtoHttp1LayerError::InternalServiceError(e))
59 }
60
61 fn call(&mut self, request: HTTP1Request) -> Self::Future {
63 let mut service = self.inner.clone();
64 let config = self.config.clone();
65 Box::pin(async move {
66 let (svc_read, mut write) = tokio::io::simplex(1024);
67 let (mut read, svc_write) = tokio::io::simplex(1024);
68 request.write_to(&mut write).await.unwrap();
69 service.call((svc_read, svc_write)).await.map_err(|e| ProtoHttp1LayerError::InternalServiceError(e))?;
70 let read_buf = AsyncReadToBuf::new_1024(ZeroReadBehaviour::TickAndYield);
71 let buf = read_buf.read_with_timeout(&mut read, config.timeout, None).await;
72 let resp = parse_http1_response(&buf).unwrap();
73 Ok(HTTP1ClientResponse::Response(resp))
74 })
75 }
76}