body_image_futio/
fetch.rs1use std::future::Future;
2
3use bytes::Bytes;
4use futures_util::{
5 future::{Either, FutureExt as _, TryFutureExt},
6 stream::{StreamExt, TryStreamExt},
7};
8
9use hyperx::header::{ContentLength, TypedHeaders};
10
11use body_image::{BodySink, Dialog};
12
13use crate::{
14 AsyncBodySink, BlockingPolicy, DispatchBodySink, Flaw, FutioError,
15 FutioTunables, InDialog, Monolog, PermitBodySink, RequestRecord,
16 SinkWrapper,
17};
18
19pub fn fetch<B>(rr: RequestRecord<B>, tune: FutioTunables)
25 -> Result<Dialog, FutioError>
26 where B: http_body::Body + Send + 'static,
27 B::Data: Send + Unpin,
28 B::Error: Into<Flaw>
29{
30 let rt = tokio::runtime::Builder::new_multi_thread()
31 .worker_threads(2)
32 .max_blocking_threads(2)
33 .enable_io()
34 .enable_time()
35 .build()
36 .unwrap();
37
38 let connector = hyper_tls::HttpsConnector::new();
39 let client = hyper::Client::builder().build(connector);
40
41 let join = rt.spawn(request_dialog(&client, rr, tune));
42 rt.block_on(join)
43 .map_err(|e| FutioError::Other(Box::new(e)))?
44}
45
46pub fn request_dialog<CN, B>(
53 client: &hyper::Client<CN, B>,
54 rr: RequestRecord<B>,
55 tune: FutioTunables)
56 -> impl Future<Output=Result<Dialog, FutioError>> + Send + 'static
57 where CN: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
58 B: http_body::Body + Send + 'static,
59 B::Data: Send,
60 B::Error: Into<Flaw>
61{
62 let prolog = rr.prolog;
63
64 let futr = client
65 .request(rr.request)
66 .err_into::<FutioError>()
67 .map_ok(|response| Monolog { prolog, response });
68
69 let futr = if let Some(t) = tune.res_timeout() {
70 Either::Left(
71 tokio::time::timeout(t, futr).map(move |r| match r {
72 Ok(Ok(v)) => Ok(v),
73 Ok(Err(e)) => Err(e),
74 Err(_) => Err(FutioError::ResponseTimeout(t))
75 })
76 )
77 } else {
78 Either::Right(futr)
79 };
80
81 async move {
82 let monolog = futr .await?;
83
84 let body_timeout = tune.body_timeout();
85
86 let futr = resp_future(monolog, tune);
87
88 let futr = if let Some(t) = body_timeout {
89 Either::Left(
90 tokio::time::timeout(t, futr).map(move |r| match r {
91 Ok(Ok(v)) => Ok(v),
92 Ok(Err(e)) => Err(e),
93 Err(_) => Err(FutioError::BodyTimeout(t))
94 })
95 )
96 } else {
97 Either::Right(futr)
98 };
99
100 futr .await? .prepare()
101 }
102}
103
104async fn resp_future(monolog: Monolog, tune: FutioTunables)
105 -> Result<InDialog, FutioError>
106{
107 let (resp_parts, body) = monolog.response.into_parts();
108
109 let bsink = match resp_parts.headers.try_decode::<ContentLength>() {
111 Some(Ok(ContentLength(l))) => {
112 if l > tune.image().max_body() {
113 Err(FutioError::ContentLengthTooLong(l))
114 } else if l > tune.image().max_body_ram() {
115 BodySink::with_fs(tune.image().temp_dir())
116 .map_err(FutioError::from)
117 } else {
118 Ok(BodySink::with_ram(l))
119 }
120 },
121 Some(Err(e)) => Err(FutioError::Other(Box::new(e))),
122 None => Ok(BodySink::with_ram(tune.image().max_body_ram()))
123 }?;
124
125 let body = body.err_into::<FutioError>();
130
131 let res_body = match tune.blocking_policy() {
132 BlockingPolicy::Direct => {
133 let mut sink = AsyncBodySink::<Bytes>::new(bsink, tune);
134 body.forward(&mut sink)
135 .await?;
136 sink.into_inner()
137 }
138 BlockingPolicy::Permit(_) => {
139 let mut sink = PermitBodySink::<Bytes>::new(bsink, tune);
140 body.forward(&mut sink)
141 .await?;
142 sink.into_inner()
143 }
144 BlockingPolicy::Dispatch => {
145 let mut sink = DispatchBodySink::<Bytes>::new(bsink, tune);
146 body.forward(&mut sink)
147 .await?;
148 sink.into_inner()
149 }
150 };
151
152 Ok(InDialog {
153 prolog: monolog.prolog,
154 version: resp_parts.version,
155 status: resp_parts.status,
156 res_headers: resp_parts.headers,
157 res_body
158 })
159}