body_image_futio/
fetch.rs

1use std::future::Future;
2
3use bytes::Bytes;
4use futures_util::{
5    future::{Either, FutureExt as _, TryFutureExt},
6    stream::{StreamExt, TryStreamExt},
7};
8
9use hyperx::header::{ContentLength, TypedHeaders};
10
11use body_image::{BodySink, Dialog};
12
13use crate::{
14    AsyncBodySink, BlockingPolicy, DispatchBodySink, Flaw, FutioError,
15    FutioTunables, InDialog, Monolog, PermitBodySink, RequestRecord,
16    SinkWrapper,
17};
18
19/// Run an HTTP request to completion, returning the full `Dialog`.
20///
21/// This function constructs a tokio `Runtime` (ThreadPool),
22/// `hyper_tls::HttpsConnector`, and `hyper::Client` in a simplistic form
23/// internally, waiting with timeout, and dropping these on completion.
24pub fn fetch<B>(rr: RequestRecord<B>, tune: FutioTunables)
25    -> Result<Dialog, FutioError>
26    where B: http_body::Body + Send + 'static,
27          B::Data: Send + Unpin,
28          B::Error: Into<Flaw>
29{
30    let rt = tokio::runtime::Builder::new_multi_thread()
31        .worker_threads(2)
32        .max_blocking_threads(2)
33        .enable_io()
34        .enable_time()
35        .build()
36        .unwrap();
37
38    let connector = hyper_tls::HttpsConnector::new();
39    let client = hyper::Client::builder().build(connector);
40
41    let join = rt.spawn(request_dialog(&client, rr, tune));
42    rt.block_on(join)
43        .map_err(|e| FutioError::Other(Box::new(e)))?
44}
45
46/// Given a suitable `hyper::Client` and `RequestRecord`, return a
47/// `Future` with `Dialog` output.
48///
49/// The provided `FutioTunables` governs timeout intervals (initial response
50/// and complete body), if the response `BodyImage` will be in `Ram` or
51/// `FsRead`, and `BlockingPolicy`.
52pub fn request_dialog<CN, B>(
53    client: &hyper::Client<CN, B>,
54    rr: RequestRecord<B>,
55    tune: FutioTunables)
56    -> impl Future<Output=Result<Dialog, FutioError>> + Send + 'static
57    where CN: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
58          B: http_body::Body + Send + 'static,
59          B::Data: Send,
60          B::Error: Into<Flaw>
61{
62    let prolog = rr.prolog;
63
64    let futr = client
65        .request(rr.request)
66        .err_into::<FutioError>()
67        .map_ok(|response| Monolog { prolog, response });
68
69    let futr = if let Some(t) = tune.res_timeout() {
70        Either::Left(
71            tokio::time::timeout(t, futr).map(move |r| match r {
72                Ok(Ok(v)) => Ok(v),
73                Ok(Err(e)) => Err(e),
74                Err(_) => Err(FutioError::ResponseTimeout(t))
75            })
76        )
77    } else {
78        Either::Right(futr)
79    };
80
81    async move {
82        let monolog = futr .await?;
83
84        let body_timeout = tune.body_timeout();
85
86        let futr = resp_future(monolog, tune);
87
88        let futr = if let Some(t) = body_timeout {
89            Either::Left(
90                tokio::time::timeout(t, futr).map(move |r| match r {
91                    Ok(Ok(v)) => Ok(v),
92                    Ok(Err(e)) => Err(e),
93                    Err(_) => Err(FutioError::BodyTimeout(t))
94                })
95            )
96        } else {
97            Either::Right(futr)
98        };
99
100        futr .await? .prepare()
101    }
102}
103
104async fn resp_future(monolog: Monolog, tune: FutioTunables)
105    -> Result<InDialog, FutioError>
106{
107    let (resp_parts, body) = monolog.response.into_parts();
108
109    // Result<BodySink> based on CONTENT_LENGTH header.
110    let bsink = match resp_parts.headers.try_decode::<ContentLength>() {
111        Some(Ok(ContentLength(l))) => {
112            if l > tune.image().max_body() {
113                Err(FutioError::ContentLengthTooLong(l))
114            } else if l > tune.image().max_body_ram() {
115                BodySink::with_fs(tune.image().temp_dir())
116                    .map_err(FutioError::from)
117            } else {
118                Ok(BodySink::with_ram(l))
119            }
120        },
121        Some(Err(e)) => Err(FutioError::Other(Box::new(e))),
122        None => Ok(BodySink::with_ram(tune.image().max_body_ram()))
123    }?;
124
125    // Regardless of policy, we always receive `Bytes` from hyper, so there is
126    // no advantage to converting to `UniBodyBuf` here. Memory mapped buffers
127    // are never received.
128
129    let body = body.err_into::<FutioError>();
130
131    let res_body = match tune.blocking_policy() {
132        BlockingPolicy::Direct => {
133            let mut sink = AsyncBodySink::<Bytes>::new(bsink, tune);
134            body.forward(&mut sink)
135                .await?;
136            sink.into_inner()
137        }
138        BlockingPolicy::Permit(_) => {
139            let mut sink = PermitBodySink::<Bytes>::new(bsink, tune);
140            body.forward(&mut sink)
141                .await?;
142            sink.into_inner()
143        }
144        BlockingPolicy::Dispatch => {
145            let mut sink = DispatchBodySink::<Bytes>::new(bsink, tune);
146            body.forward(&mut sink)
147                .await?;
148            sink.into_inner()
149        }
150    };
151
152    Ok(InDialog {
153        prolog:      monolog.prolog,
154        version:     resp_parts.version,
155        status:      resp_parts.status,
156        res_headers: resp_parts.headers,
157        res_body
158    })
159}