proto_tower_http_1/client/
layer.rs

1use crate::client::parser::parse_http1_response;
2use crate::client::ProtoHttp1ClientConfig;
3use crate::data::request::HTTP1Request;
4use crate::data::HTTP1ClientResponse;
5use proto_tower_util::{AsyncReadToBuf, WriteTo, ZeroReadBehaviour};
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::io::{ReadHalf, SimplexStream, WriteHalf};
10use tower::Service;
11
12/// A service to process HTTP/1.1 requests
13///
14/// This should not be constructed directly - it gets created by MakeService during invocation.
15pub struct ProtoHttp1ClientLayer<Svc>
16where
17    Svc: Service<(ReadHalf<SimplexStream>, WriteHalf<SimplexStream>), Response = ()> + Send + Clone,
18{
19    config: ProtoHttp1ClientConfig,
20    /// The inner service to process requests
21    inner: Svc,
22}
23
24impl<Svc> ProtoHttp1ClientLayer<Svc>
25where
26    Svc: Service<(ReadHalf<SimplexStream>, WriteHalf<SimplexStream>), Response = ()> + Send + Clone,
27{
28    /// Create a new instance of the service
29    pub fn new(config: ProtoHttp1ClientConfig, inner: Svc) -> Self {
30        ProtoHttp1ClientLayer { config, inner }
31    }
32}
33
34#[derive(Debug)]
35pub enum ProtoHttp1LayerError<SvcError> {
36    /// An error in the implementation of this layer
37    #[allow(dead_code)]
38    Implementation(String),
39    /// The internal service returned a wrong response
40    InternalServiceWrongResponse,
41    /// An error in the internal service
42    InternalServiceError(SvcError),
43}
44
45impl<Svc, SvcError, SvcFut> Service<HTTP1Request> for ProtoHttp1ClientLayer<Svc>
46where
47    Svc: Service<(ReadHalf<SimplexStream>, WriteHalf<SimplexStream>), Response = (), Error = SvcError, Future = SvcFut> + Send + Clone + 'static,
48    SvcFut: Future<Output = Result<(), SvcError>> + Send,
49{
50    /// The response is handled by the protocol
51    type Response = HTTP1ClientResponse<ReadHalf<SimplexStream>, WriteHalf<SimplexStream>>;
52    /// Errors would be failures in parsing the protocol - this should be handled by the protocol
53    type Error = ProtoHttp1LayerError<SvcError>;
54    /// The future is the protocol itself
55    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
56
57    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58        self.inner.poll_ready(cx).map_err(|e| ProtoHttp1LayerError::InternalServiceError(e))
59    }
60
61    /// Indefinitely process the protocol
62    fn call(&mut self, request: HTTP1Request) -> Self::Future {
63        let mut service = self.inner.clone();
64        let config = self.config.clone();
65        Box::pin(async move {
66            let (svc_read, mut write) = tokio::io::simplex(1024);
67            let (mut read, svc_write) = tokio::io::simplex(1024);
68            request.write_to(&mut write).await.unwrap();
69            service.call((svc_read, svc_write)).await.map_err(|e| ProtoHttp1LayerError::InternalServiceError(e))?;
70            let read_buf = AsyncReadToBuf::new_1024(ZeroReadBehaviour::TickAndYield);
71            let buf = read_buf.read_with_timeout(&mut read, config.timeout, None).await;
72            let resp = parse_http1_response(&buf).unwrap();
73            Ok(HTTP1ClientResponse::Response(resp))
74        })
75    }
76}