databend_client/
presign.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeMap, path::Path};
16
17use log::info;
18use reqwest::{Body, Client as HttpClient, StatusCode};
19use tokio::io::AsyncRead;
20use tokio::io::AsyncWriteExt;
21use tokio_stream::StreamExt;
22use tokio_util::io::ReaderStream;
23
24use crate::error::{Error, Result};
25
26pub type Reader = Box<dyn AsyncRead + Send + Sync + Unpin + 'static>;
27
28#[derive(Debug, Clone, Copy)]
29pub enum PresignMode {
30    Auto,
31    Detect,
32    On,
33    Off,
34}
35
36pub struct PresignedResponse {
37    pub method: String,
38    pub headers: BTreeMap<String, String>,
39    pub url: String,
40}
41
42pub async fn presign_upload_to_stage(
43    presigned: PresignedResponse,
44    data: Reader,
45    size: u64,
46) -> Result<()> {
47    info!("upload to stage with presigned url, size: {}", size);
48    let client = HttpClient::new();
49    let mut builder = client.put(presigned.url);
50    for (k, v) in presigned.headers {
51        if k.to_lowercase() == "content-length" {
52            continue;
53        }
54        builder = builder.header(k, v);
55    }
56    builder = builder.header("Content-Length", size.to_string());
57    let stream = Body::wrap_stream(ReaderStream::new(data));
58    let resp = builder.body(stream).send().await?;
59    let status = resp.status();
60    let body = resp.bytes().await?;
61    match status {
62        StatusCode::OK => Ok(()),
63        _ => Err(Error::IO(format!(
64            "Upload with presigned url failed: {}",
65            String::from_utf8_lossy(&body)
66        ))),
67    }
68}
69
70pub async fn presign_download_from_stage(
71    presigned: PresignedResponse,
72    local_path: &Path,
73) -> Result<u64> {
74    if let Some(p) = local_path.parent() {
75        tokio::fs::create_dir_all(p).await?;
76    }
77    let client = HttpClient::new();
78    let mut builder = client.get(presigned.url);
79    for (k, v) in presigned.headers {
80        builder = builder.header(k, v);
81    }
82
83    let resp = builder.send().await?;
84    let status = resp.status();
85    match status {
86        StatusCode::OK => {
87            let mut file = tokio::fs::File::create(local_path).await?;
88            let mut body = resp.bytes_stream();
89            while let Some(chunk) = body.next().await {
90                file.write_all(&chunk?).await?;
91            }
92            file.flush().await?;
93            let metadata = file.metadata().await?;
94            Ok(metadata.len())
95        }
96        _ => Err(Error::IO(format!(
97            "Download with presigned url failed: {}",
98            status
99        ))),
100    }
101}