use crate::auth::AccessToken;
use crate::errors::{NetDiskError, NetDiskResult};
use crate::http::HttpClient;
use futures::stream::{self, StreamExt};
use log::debug;
use serde::Deserialize;
#[derive(Debug, Clone)]
pub struct UploadClient {
http_client: HttpClient,
}
impl UploadClient {
pub fn new(http_client: HttpClient) -> Self {
Self { http_client }
}
pub fn http_client(&self) -> &HttpClient {
&self.http_client
}
pub async fn precreate(
&self,
access_token: &AccessToken,
options: PrecreateOptions,
) -> NetDiskResult<PrecreateResponse> {
let block_list_json =
serde_json::to_string(&options.block_list).map_err(|e| NetDiskError::Unknown {
message: format!("Failed to serialize block_list: {}", e),
})?;
let params = vec![
("method", "precreate"),
("access_token", access_token.access_token.as_str()),
];
let size_str = options.size.to_string();
let isdir_str = options.isdir.to_string();
let rtype_str = options.rtype.to_string();
let form_data = vec![
("path", options.path.as_str()),
("size", size_str.as_str()),
("isdir", isdir_str.as_str()),
("block_list", block_list_json.as_str()),
("autoinit", "1"),
("rtype", rtype_str.as_str()),
];
debug!(
"Precreate upload: path={}, size={}, isdir={}, block_list={:?}",
options.path, options.size, options.isdir, options.block_list
);
let response: PrecreateResponse = self
.http_client
.post_form("/rest/2.0/xpan/file", Some(&form_data), Some(¶ms))
.await?;
if response.errno != 0 {
let error_msg = get_error_message(response.errno);
return Err(NetDiskError::api_error(response.errno, &error_msg));
}
debug!(
"Precreate success: uploadid={}, block_list={:?}",
response.uploadid, response.block_list
);
Ok(response)
}
}
#[derive(Debug, Clone, Default)]
pub struct PrecreateOptions {
pub path: String,
pub size: u64,
pub isdir: i32,
pub block_list: Vec<String>,
pub rtype: i32,
pub uploadid: Option<String>,
pub content_md5: Option<String>,
pub slice_md5: Option<String>,
pub local_ctime: Option<u64>,
pub local_mtime: Option<u64>,
}
impl PrecreateOptions {
pub fn new(path: &str, size: u64, block_list: Vec<String>) -> Self {
Self {
path: path.to_string(),
size,
isdir: 0,
block_list,
rtype: 1,
uploadid: None,
content_md5: None,
slice_md5: None,
local_ctime: None,
local_mtime: None,
}
}
pub fn isdir(mut self, isdir: bool) -> Self {
self.isdir = if isdir { 1 } else { 0 };
self
}
pub fn rtype(mut self, rtype: i32) -> Self {
self.rtype = rtype;
self
}
pub fn uploadid(mut self, uploadid: &str) -> Self {
self.uploadid = Some(uploadid.to_string());
self
}
pub fn content_md5(mut self, md5: &str) -> Self {
self.content_md5 = Some(md5.to_string());
self
}
pub fn slice_md5(mut self, md5: &str) -> Self {
self.slice_md5 = Some(md5.to_string());
self
}
pub fn local_ctime(mut self, ctime: u64) -> Self {
self.local_ctime = Some(ctime);
self
}
pub fn local_mtime(mut self, mtime: u64) -> Self {
self.local_mtime = Some(mtime);
self
}
}
#[derive(Debug, Deserialize)]
pub struct PrecreateResponse {
pub errno: i32,
#[serde(default)]
pub path: Option<String>,
pub uploadid: String,
#[serde(rename = "return_type")]
pub return_type: i32,
#[serde(rename = "block_list")]
pub block_list: Vec<u32>,
}
impl UploadClient {
pub async fn upload_chunk(
&self,
access_token: &AccessToken,
options: UploadChunkOptions,
) -> NetDiskResult<UploadChunkResponse> {
let url = format!(
"https://c3.pcs.baidu.com/rest/2.0/pcs/superfile2?method=upload&access_token={}&type=tmpfile&path={}&uploadid={}&partseq={}",
urlencoding::encode(&access_token.access_token),
urlencoding::encode(&options.path),
urlencoding::encode(&options.uploadid),
options.partseq
);
debug!(
"Upload chunk: path={}, uploadid={}, partseq={}, data_size={}",
options.path,
options.uploadid,
options.partseq,
options.data.len()
);
let response: UploadChunkResponse = self
.http_client
.post_multipart(
&url,
"file".to_string(),
"chunk.dat".to_string(),
options.data,
)
.await?;
Ok(response)
}
pub async fn upload_chunks_parallel(
&self,
access_token: &AccessToken,
remote_path: &str,
uploadid: &str,
chunks: Vec<(u32, Vec<u8>)>,
max_concurrency: usize,
) -> NetDiskResult<Vec<(u32, String)>> {
debug!(
"Uploading {} chunks in parallel (max_concurrency: {})",
chunks.len(),
max_concurrency
);
let access_token_str = access_token.access_token.clone();
let remote_path_str = remote_path.to_string();
let uploadid_str = uploadid.to_string();
let http_client = self.http_client.clone();
let mut stream = stream::iter(chunks)
.map(|(partseq, data)| {
let path = remote_path_str.clone();
let uid = access_token_str.clone();
let upid = uploadid_str.clone();
let client = http_client.clone();
async move {
let url = format!(
"https://c3.pcs.baidu.com/rest/2.0/pcs/superfile2?method=upload&access_token={}&type=tmpfile&path={}&uploadid={}&partseq={}",
urlencoding::encode(&uid),
urlencoding::encode(&path),
urlencoding::encode(&upid),
partseq
);
debug!("Uploading chunk {} ({} bytes)", partseq, data.len());
let response: UploadChunkResponse = client
.post_multipart(&url, "file".to_string(), "chunk.dat".to_string(), data)
.await?;
Ok((partseq, response.md5))
}
})
.buffer_unordered(max_concurrency);
let mut chunk_md5s = Vec::new();
while let Some(result) = stream.next().await {
match result {
Ok((partseq, md5)) => {
chunk_md5s.push((partseq, md5));
}
Err(e) => {
return Err(e);
}
}
}
debug!("All chunks uploaded successfully");
Ok(chunk_md5s)
}
pub async fn create_file(
&self,
access_token: &AccessToken,
options: CreateFileOptions,
) -> NetDiskResult<CreateFileResponse> {
let block_list_json =
serde_json::to_string(&options.block_list).map_err(|e| NetDiskError::Unknown {
message: format!("Failed to serialize block_list: {}", e),
})?;
let params = vec![
("method", "create"),
("access_token", access_token.access_token.as_str()),
];
let size_str = options.size.to_string();
let isdir_str = options.isdir.to_string();
let rtype_str = options.rtype.to_string();
let mut form_data = vec![
("path", options.path.as_str()),
("size", size_str.as_str()),
("isdir", isdir_str.as_str()),
("block_list", block_list_json.as_str()),
("uploadid", options.uploadid.as_str()),
("rtype", rtype_str.as_str()),
];
let ctime_str = options.local_ctime.map(|t| t.to_string());
let mtime_str = options.local_mtime.map(|t| t.to_string());
if let Some(ref ctime) = ctime_str {
form_data.push(("local_ctime", ctime.as_str()));
}
if let Some(ref mtime) = mtime_str {
form_data.push(("local_mtime", mtime.as_str()));
}
debug!(
"Create file: path={}, size={}, isdir={}, uploadid={}",
options.path, options.size, options.isdir, options.uploadid
);
let response: CreateFileResponse = self
.http_client
.post_form("/rest/2.0/xpan/file", Some(&form_data), Some(¶ms))
.await?;
if response.errno != 0 {
let error_msg = get_create_error_message(response.errno);
return Err(NetDiskError::api_error(response.errno, &error_msg));
}
Ok(response)
}
}
#[derive(Debug, Clone)]
pub struct UploadChunkOptions {
pub path: String,
pub uploadid: String,
pub partseq: u32,
pub data: Vec<u8>,
}
impl UploadChunkOptions {
pub fn new(path: &str, uploadid: &str, partseq: u32, data: Vec<u8>) -> Self {
Self {
path: path.to_string(),
uploadid: uploadid.to_string(),
partseq,
data,
}
}
}
#[derive(Debug, Deserialize)]
pub struct UploadChunkResponse {
pub md5: String,
}
#[derive(Debug, Clone, Default)]
pub struct CreateFileOptions {
pub path: String,
pub size: u64,
pub isdir: i32,
pub block_list: Vec<String>,
pub uploadid: String,
pub rtype: i32,
pub local_ctime: Option<u64>,
pub local_mtime: Option<u64>,
}
impl CreateFileOptions {
pub fn new(path: &str, size: u64, block_list: Vec<String>, uploadid: &str) -> Self {
Self {
path: path.to_string(),
size,
isdir: 0,
block_list,
uploadid: uploadid.to_string(),
rtype: 1,
local_ctime: None,
local_mtime: None,
}
}
pub fn isdir(mut self, isdir: bool) -> Self {
self.isdir = if isdir { 1 } else { 0 };
self
}
pub fn rtype(mut self, rtype: i32) -> Self {
self.rtype = rtype;
self
}
pub fn local_ctime(mut self, ctime: u64) -> Self {
self.local_ctime = Some(ctime);
self
}
pub fn local_mtime(mut self, mtime: u64) -> Self {
self.local_mtime = Some(mtime);
self
}
}
#[derive(Debug, Deserialize)]
pub struct CreateFileResponse {
pub errno: i32,
#[serde(rename = "fs_id")]
pub fs_id: u64,
pub md5: Option<String>,
#[serde(rename = "server_filename")]
#[serde(default)]
pub server_filename: Option<String>,
pub category: i32,
pub path: String,
pub size: u64,
pub ctime: u64,
pub mtime: u64,
pub isdir: i32,
#[serde(default)]
pub name: Option<String>,
#[serde(rename = "from_type")]
#[serde(default)]
pub from_type: Option<i32>,
}
fn get_create_error_message(errno: i32) -> String {
match errno {
-7 => "File or directory name error or access denied".to_string(),
-8 => "File or directory already exists".to_string(),
-10 => "Cloud storage capacity full".to_string(),
10 => "Failed to create file".to_string(),
31190 => "File not found".to_string(),
31355 => "Invalid parameter".to_string(),
31365 => "Total file size limit exceeded".to_string(),
_ => format!("Unknown error: {}", errno),
}
}
fn get_error_message(errno: i32) -> String {
match errno {
-7 => "File or directory name error or access denied".to_string(),
-10 => "Insufficient capacity".to_string(),
_ => format!("Unknown error: {}", errno),
}
}