1use crate::*;
2use anyhow::Context;
3use tokio::io::AsyncSeekExt;
4
5pub trait AsyncWriteUnpin: tokio::io::AsyncWrite + std::marker::Unpin + Send + Sync {}
6
7impl<T> AsyncWriteUnpin for T
8where T: tokio::io::AsyncWrite + std::marker::Unpin + Send + Sync {}
9
10fn body_to_str(b: hyper::body::Bytes) -> String {
11 String::from_utf8(b.to_vec()).unwrap_or("[UTF-8 decode failed]".into())
12}
13
14#[derive(Debug, Serialize)]
16pub struct EmptyRequest {}
17
18#[derive(Debug, Deserialize, Clone, Default)]
20pub struct EmptyResponse {}
21
22#[derive(Debug, PartialEq)]
24pub enum DownloadResult<T: DeserializeOwned + std::fmt::Debug> {
25 Downloaded,
27 Response(T),
29}
30
31pub async fn do_request<
33 Req: Serialize + std::fmt::Debug,
34 Resp: DeserializeOwned + Clone + Default,
35>(
36 cl: &TlsClient,
37 path: &str,
38 headers: &[(hyper::header::HeaderName, String)],
39 http_method: &str,
40 rq: Option<Req>,
41) -> Result<Resp> {
42 use futures::future::FutureExt;
43 do_request_with_headers(cl, path, headers, http_method, rq)
44 .map(|r| r.map(|t| t.0))
45 .await
46}
47
48pub async fn do_request_with_headers<
51 Req: Serialize + std::fmt::Debug,
52 Resp: DeserializeOwned + Clone + Default,
53>(
54 cl: &TlsClient,
55 path: &str,
56 headers: &[(hyper::header::HeaderName, String)],
57 http_method: &str,
58 rq: Option<Req>,
59) -> Result<(Resp, hyper::HeaderMap)> {
60 let mut reqb = hyper::Request::builder().uri(path).method(http_method);
61 for (k, v) in headers {
62 reqb = reqb.header(k, v);
63 }
64 reqb = reqb.header("Content-Type", "application/json");
65 let body_str;
66 if let Some(rq) = rq {
67 body_str = serde_json::to_string(&rq).context(format!("{:?}", rq))?;
68 } else {
69 body_str = "".to_string();
70 }
71
72 let body;
73 if body_str == "null" {
74 body = hyper::Body::from("");
75 } else {
76 body = hyper::Body::from(body_str);
77 }
78
79 let http_request = reqb.body(body)?;
80
81 debug!("do_request: Launching HTTP request: {:?}", http_request);
82
83 let http_response = cl.request(http_request).await?;
84 let status = http_response.status();
85
86 debug!(
87 "do_request: HTTP response with status {} received: {:?}",
88 status, http_response
89 );
90
91 let headers = http_response.headers().clone();
92 let response_body = hyper::body::to_bytes(http_response.into_body()).await?;
93 if !status.is_success() {
94 Err(ApiError::HTTPResponseError(status, body_to_str(response_body)).into())
95 } else {
96 if response_body.len() > 0 {
98 serde_json::from_reader(response_body.as_ref())
99 .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
100 .map(|r| (r, headers))
101 } else {
102 Ok((Default::default(), headers))
103 }
104 }
105}
106
107pub async fn do_upload_multipart<
109 Req: Serialize + std::fmt::Debug,
110 Resp: DeserializeOwned + Clone,
111>(
112 cl: &TlsClient,
113 path: &str,
114 headers: &[(hyper::header::HeaderName, String)],
115 http_method: &str,
116 req: Option<Req>,
117 data: hyper::body::Bytes,
118) -> Result<Resp> {
119 let mut reqb = hyper::Request::builder().uri(path).method(http_method);
120 for (k, v) in headers {
121 reqb = reqb.header(k, v);
122 }
123
124 let data = multipart::format_multipart(&req, data)?;
125 reqb = reqb.header("Content-Length", data.as_ref().len());
126 reqb = reqb.header(
127 "Content-Type",
128 format!("multipart/related; boundary={}", multipart::MIME_BOUNDARY),
129 );
130
131 let body = hyper::Body::from(data.as_ref().to_vec());
132 let http_request = reqb.body(body)?;
133 debug!(
134 "do_upload_multipart: Launching HTTP request: {:?}",
135 http_request
136 );
137 let http_response = cl.request(http_request).await?;
138 let status = http_response.status();
139 debug!(
140 "do_upload_multipart: HTTP response with status {} received: {:?}",
141 status, http_response
142 );
143 let response_body = hyper::body::to_bytes(http_response.into_body()).await?;
144
145 if !status.is_success() {
146 Err(ApiError::HTTPResponseError(status, body_to_str(response_body)).into())
147 } else {
148 serde_json::from_reader(response_body.as_ref())
149 .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
150 }
151}
152
153pub struct Download<'a, Request, Response> {
161 cl: &'a TlsClient,
162 http_method: String,
163 uri: hyper::Uri,
164 rq: Option<&'a Request>,
165 headers: Vec<(hyper::header::HeaderName, String)>,
166
167 _marker: std::marker::PhantomData<Response>,
168}
169
170impl<'a, Request: Serialize + std::fmt::Debug, Response: DeserializeOwned + std::fmt::Debug>
171 Download<'a, Request, Response>
172{
173 pub async fn do_it_to_buf(&mut self, buf: &mut Vec<u8>) -> Result<DownloadResult<Response>> {
175 self.do_it(Some(buf)).await
176 }
177
178 pub async fn do_it(
188 &mut self,
189 dst: Option<&mut (dyn AsyncWriteUnpin)>,
190 ) -> Result<DownloadResult<Response>> {
191 use std::str::FromStr;
192
193 let mut http_response;
194 let mut n_redirects = 0;
195 let mut uri = self.uri.clone();
196
197 loop {
199 let mut reqb = hyper::Request::builder()
200 .uri(&uri)
201 .method(self.http_method.as_str());
202 for (k, v) in self.headers.iter() {
203 reqb = reqb.header(k, v);
204 }
205
206 let body;
207 if let Some(rq) = self.rq.take() {
208 body = hyper::Body::from(
209 serde_json::to_string(&rq).context(format!("{:?}", self.rq))?,
210 );
211 } else {
212 body = hyper::Body::from("");
213 }
214
215 let http_request = reqb.body(body)?;
216 debug!(
217 "Download::do_it: Redirect {}, Launching HTTP request: {:?}",
218 n_redirects, http_request
219 );
220
221 http_response = Some(self.cl.request(http_request).await?);
222 let status = http_response.as_ref().unwrap().status();
223 debug!(
224 "Download::do_it: Redirect {}, HTTP response with status {} received: {:?}",
225 n_redirects, status, http_response
226 );
227
228 if status.is_success() {
230 let headers = http_response.as_ref().unwrap().headers();
231
232 if let Some(ct) = headers.get(hyper::header::CONTENT_TYPE) {
234 if ct.to_str()?.contains("application/json") {
235 let response_body =
236 hyper::body::to_bytes(http_response.unwrap().into_body()).await?;
237 return serde_json::from_reader(response_body.as_ref())
238 .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
239 .map(DownloadResult::Response);
240 }
241 }
242
243 if let Some(dst) = dst {
244 use tokio::io::AsyncWriteExt;
245 let mut response_body = http_response.unwrap().into_body();
246 while let Some(chunk) = tokio_stream::StreamExt::next(&mut response_body).await
247 {
248 let chunk = chunk?;
249 dst.write(chunk.as_ref()).await?;
252 }
253 return Ok(DownloadResult::Downloaded);
254 } else {
255 return Err(ApiError::DataAvailableError(format!(
256 "No `dst` was supplied to download data to. Content-Type: {:?}",
257 headers.get(hyper::header::CONTENT_TYPE)
258 ))
259 .into());
260 }
261
262 } else if status.is_redirection() {
264 n_redirects += 1;
265 let new_location = http_response
266 .as_ref()
267 .unwrap()
268 .headers()
269 .get(hyper::header::LOCATION);
270 if new_location.is_none() {
271 return Err(ApiError::RedirectError(format!(
272 "Redirect doesn't contain a Location: header"
273 ))
274 .into());
275 }
276 uri = hyper::Uri::from_str(new_location.unwrap().to_str()?)?;
277 continue;
278 } else if !status.is_success() {
279 return Err(ApiError::HTTPResponseError(
280 status,
281 body_to_str(hyper::body::to_bytes(http_response.unwrap().into_body()).await?),
282 )
283 .into());
284 }
285
286 if n_redirects > 5 {
288 return Err(ApiError::HTTPTooManyRedirectsError.into());
289 }
290 }
291 }
292}
293
294pub async fn do_download<
295 'a,
296 Req: Serialize + std::fmt::Debug,
297 Resp: DeserializeOwned + std::fmt::Debug,
298>(
299 cl: &'a TlsClient,
300 path: &str,
301 headers: Vec<(hyper::header::HeaderName, String)>,
302 http_method: String,
303 rq: Option<&'a Req>,
304) -> Result<Download<'a, Req, Resp>> {
305 use std::str::FromStr;
306 Ok(Download {
307 cl: cl,
308 http_method: http_method,
309 uri: hyper::Uri::from_str(path)?,
310 rq: rq,
311 headers: headers,
312 _marker: Default::default(),
313 })
314}
315
316pub struct ResumableUpload<'client, Response: DeserializeOwned> {
318 dest: hyper::Uri,
319 cl: &'client TlsClient,
320 max_chunksize: usize,
321 _resp: std::marker::PhantomData<Response>,
322}
323
324fn format_content_range(from: usize, to: usize, total: usize) -> String {
325 format!("bytes {}-{}/{}", from, to, total)
326}
327
328fn parse_response_range(rng: &str) -> Option<(usize, usize)> {
329 if let Some(main) = rng.strip_prefix("bytes=") {
330 let mut parts = main.split("-");
331 let (first, second) = (parts.next(), parts.next());
332 if first.is_none() || second.is_none() {
333 return None;
334 }
335 Some((
336 usize::from_str_radix(first.unwrap(), 10).unwrap_or(0),
337 usize::from_str_radix(second.unwrap(), 10).unwrap_or(0),
338 ))
339 } else {
340 None
341 }
342}
343
344impl<'client, Response: DeserializeOwned> ResumableUpload<'client, Response> {
345 pub fn new(
346 to: hyper::Uri,
347 cl: &'client TlsClient,
348 max_chunksize: usize,
349 ) -> ResumableUpload<'client, Response> {
350 ResumableUpload {
351 dest: to,
352 cl: cl,
353 max_chunksize: max_chunksize,
354 _resp: Default::default(),
355 }
356 }
357 pub fn set_max_chunksize(&mut self, size: usize) -> Result<&mut Self> {
358 if size % (1024 * 256) != 0 {
359 Err(ApiError::InputDataError(
360 "ResumableUpload: max_chunksize must be multiple of 256 KiB.".into(),
361 )
362 .into())
363 } else {
364 self.max_chunksize = size;
365 Ok(self)
366 }
367 }
368
369 pub async fn upload<R: tokio::io::AsyncRead + std::marker::Unpin>(
372 &self,
373 mut f: R,
374 size: usize,
375 ) -> Result<Response> {
376 use tokio::io::AsyncReadExt;
377
378 let mut current = 0;
380 let mut previously_unsent = None;
382 loop {
383 let chunksize = if (size - current) > self.max_chunksize {
384 self.max_chunksize
385 } else {
386 size - current
387 };
388
389 let mut buf: Vec<u8>;
390 let read_from_stream;
391 if let Some(buf2) = previously_unsent.take() {
392 buf = buf2;
393 read_from_stream = buf.len();
394 } else {
395 buf = vec![0 as u8; chunksize];
396 read_from_stream = f.read_exact(&mut buf).await?;
398 buf.resize(read_from_stream, 0);
399 }
400
401 let reqb = hyper::Request::builder()
402 .uri(self.dest.clone())
403 .method(hyper::Method::PUT)
404 .header(hyper::header::CONTENT_LENGTH, read_from_stream)
405 .header(
406 hyper::header::CONTENT_RANGE,
407 format_content_range(current, current + read_from_stream - 1, size),
408 )
409 .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
410 let request = reqb.body(hyper::Body::from(buf[..].to_vec()))?;
411 debug!("upload_file: Launching HTTP request: {:?}", request);
412
413 let response = self.cl.request(request).await?;
414 debug!("upload_file: Received response: {:?}", response);
415
416 let status = response.status();
417 if !status.is_success() && status.as_u16() != 308 {
419 debug!("upload_file: Encountered error: {}", status);
420 return Err(ApiError::HTTPResponseError(status, status.to_string())).context(
421 body_to_str(hyper::body::to_bytes(response.into_body()).await?),
422 );
423 }
424
425 let sent;
426 if let Some(rng) = response.headers().get(hyper::header::RANGE) {
427 if let Some((_, to)) = parse_response_range(rng.to_str()?) {
428 sent = to + 1 - current;
429 if sent < read_from_stream {
430 previously_unsent = Some(buf.split_off(sent));
431 }
432 current = to + 1;
433 } else {
434 sent = read_from_stream;
435 current += read_from_stream;
436 }
437 } else {
438 sent = read_from_stream;
439 current += read_from_stream;
440 }
441
442 debug!(
443 "upload_file: Sent {} bytes (successful: {}) of total {} to {}",
444 chunksize, sent, size, self.dest
445 );
446
447 if current >= size {
448 let headers = response.headers().clone();
449 let response_body = hyper::body::to_bytes(response.into_body()).await?;
450
451 if !status.is_success() {
452 return Err(Error::from(ApiError::HTTPResponseError(
453 status,
454 body_to_str(response_body),
455 ))
456 .context(format!("{:?}", headers)));
457 } else {
458 return serde_json::from_reader(response_body.as_ref()).map_err(|e| {
459 anyhow::Error::from(e)
460 .context(body_to_str(response_body))
461 .context(format!("{:?}", headers))
462 });
463 }
464 }
465 }
466 }
467 pub async fn upload_file(&self, mut f: tokio::fs::File) -> Result<Response> {
470 use tokio::io::AsyncReadExt;
471
472 let len = f.metadata().await?.len() as usize;
473 let mut current = 0;
474 loop {
475 let chunksize = if (len - current) > self.max_chunksize {
476 self.max_chunksize
477 } else {
478 len - current
479 };
480
481 f.seek(std::io::SeekFrom::Start(current as u64)).await?;
482
483 let mut buf = vec![0 as u8; chunksize];
484 let read_from_stream = f.read_exact(&mut buf).await?;
486 buf.resize(read_from_stream, 0);
487
488 let reqb = hyper::Request::builder()
489 .uri(self.dest.clone())
490 .method(hyper::Method::PUT)
491 .header(hyper::header::CONTENT_LENGTH, read_from_stream)
492 .header(
493 hyper::header::CONTENT_RANGE,
494 format_content_range(current, current + read_from_stream - 1, len),
495 )
496 .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
497 let request = reqb.body(hyper::Body::from(buf))?;
498 debug!("upload_file: Launching HTTP request: {:?}", request);
499
500 let response = self.cl.request(request).await?;
501 debug!("upload_file: Received response: {:?}", response);
502
503 let status = response.status();
504 if !status.is_success() && status.as_u16() != 308 {
506 debug!("upload_file: Encountered error: {}", status);
507 return Err(ApiError::HTTPResponseError(status, status.to_string())).context(
508 body_to_str(hyper::body::to_bytes(response.into_body()).await?),
509 );
510 }
511
512 let sent;
513 if let Some(rng) = response.headers().get(hyper::header::RANGE) {
514 if let Some((_, to)) = parse_response_range(rng.to_str()?) {
515 sent = to + 1 - current;
516 current = to + 1;
517 } else {
518 sent = read_from_stream;
519 current += read_from_stream;
520 }
521 } else {
522 sent = read_from_stream;
524 current += read_from_stream;
525 }
526
527 debug!(
528 "upload_file: Sent {} bytes (successful: {}) of total {} to {}",
529 chunksize, sent, len, self.dest
530 );
531
532 if current >= len {
533 let headers = response.headers().clone();
534 let response_body = hyper::body::to_bytes(response.into_body()).await?;
535
536 if !status.is_success() {
537 return Err(Error::from(ApiError::HTTPResponseError(
538 status,
539 body_to_str(response_body),
540 ))
541 .context(format!("{:?}", headers)));
542 } else {
543 return serde_json::from_reader(response_body.as_ref()).map_err(|e| {
544 anyhow::Error::from(e)
545 .context(body_to_str(response_body))
546 .context(format!("{:?}", headers))
547 });
548 }
549 }
550 }
551 }
552}