databend_client/
presign.rs1use std::{collections::BTreeMap, path::Path};
16
17use log::info;
18use reqwest::{Body, Client as HttpClient, StatusCode};
19use tokio::io::AsyncRead;
20use tokio::io::AsyncWriteExt;
21use tokio_stream::StreamExt;
22use tokio_util::io::ReaderStream;
23
24use crate::error::{Error, Result};
25
26pub type Reader = Box<dyn AsyncRead + Send + Sync + Unpin + 'static>;
27
28#[derive(Debug, Clone, Copy)]
29pub enum PresignMode {
30 Auto,
31 Detect,
32 On,
33 Off,
34}
35
36pub struct PresignedResponse {
37 pub method: String,
38 pub headers: BTreeMap<String, String>,
39 pub url: String,
40}
41
42pub async fn presign_upload_to_stage(
43 presigned: PresignedResponse,
44 data: Reader,
45 size: u64,
46) -> Result<()> {
47 info!("upload to stage with presigned url, size: {}", size);
48 let client = HttpClient::new();
49 let mut builder = client.put(presigned.url);
50 for (k, v) in presigned.headers {
51 if k.to_lowercase() == "content-length" {
52 continue;
53 }
54 builder = builder.header(k, v);
55 }
56 builder = builder.header("Content-Length", size.to_string());
57 let stream = Body::wrap_stream(ReaderStream::new(data));
58 let resp = builder.body(stream).send().await?;
59 let status = resp.status();
60 let body = resp.bytes().await?;
61 match status {
62 StatusCode::OK => Ok(()),
63 _ => Err(Error::IO(format!(
64 "Upload with presigned url failed: {}",
65 String::from_utf8_lossy(&body)
66 ))),
67 }
68}
69
70pub async fn presign_download_from_stage(
71 presigned: PresignedResponse,
72 local_path: &Path,
73) -> Result<u64> {
74 if let Some(p) = local_path.parent() {
75 tokio::fs::create_dir_all(p).await?;
76 }
77 let client = HttpClient::new();
78 let mut builder = client.get(presigned.url);
79 for (k, v) in presigned.headers {
80 builder = builder.header(k, v);
81 }
82
83 let resp = builder.send().await?;
84 let status = resp.status();
85 match status {
86 StatusCode::OK => {
87 let mut file = tokio::fs::File::create(local_path).await?;
88 let mut body = resp.bytes_stream();
89 while let Some(chunk) = body.next().await {
90 file.write_all(&chunk?).await?;
91 }
92 file.flush().await?;
93 let metadata = file.metadata().await?;
94 Ok(metadata.len())
95 }
96 _ => Err(Error::IO(format!(
97 "Download with presigned url failed: {}",
98 status
99 ))),
100 }
101}