use crate::client::TokenGetter;
use crate::errors::{NetDiskError, NetDiskResult};
use crate::http::HttpClient;
use futures::stream::{self, StreamExt};
use log::debug;
use serde::Deserialize;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct UploadClient {
http_client: HttpClient,
token_getter: Arc<dyn TokenGetter>,
}
impl UploadClient {
pub fn new(http_client: HttpClient, token_getter: Arc<dyn TokenGetter>) -> Self {
Self {
http_client,
token_getter,
}
}
pub fn http_client(&self) -> &HttpClient {
&self.http_client
}
pub async fn precreate(&self, options: PrecreateOptions) -> NetDiskResult<PrecreateResponse> {
let token = self.token_getter.get_token().await?;
let block_list_json =
serde_json::to_string(&options.block_list).map_err(|e| NetDiskError::Unknown {
message: format!("Failed to serialize block_list: {}", e),
})?;
let params = vec![
("method", "precreate"),
("access_token", token.access_token.as_str()),
];
let size_str = options.size.to_string();
let isdir_str = options.isdir.to_string();
let rtype_str = options.rtype.to_string();
let form_data = vec![
("path", options.path.as_str()),
("size", size_str.as_str()),
("isdir", isdir_str.as_str()),
("block_list", block_list_json.as_str()),
("autoinit", "1"),
("rtype", rtype_str.as_str()),
];
debug!(
"Precreate upload: path={}, size={}, isdir={}, block_list={:?}",
options.path, options.size, options.isdir, options.block_list
);
let response: PrecreateResponse = self
.http_client
.post_form("/rest/2.0/xpan/file", Some(&form_data), Some(¶ms))
.await?;
if response.errno != 0 {
let error_msg = get_error_message(response.errno);
return Err(NetDiskError::api_error(response.errno, &error_msg));
}
debug!(
"Precreate success: uploadid={}, block_list={:?}",
response.uploadid, response.block_list
);
Ok(response)
}
}
#[derive(Debug, Clone, Default)]
pub struct PrecreateOptions {
pub path: String,
pub size: u64,
pub isdir: i32,
pub block_list: Vec<String>,
pub rtype: i32,
pub uploadid: Option<String>,
pub content_md5: Option<String>,
pub slice_md5: Option<String>,
pub local_ctime: Option<u64>,
pub local_mtime: Option<u64>,
}
impl PrecreateOptions {
pub fn new(path: &str, size: u64, block_list: Vec<String>) -> Self {
Self {
path: path.to_string(),
size,
isdir: 0,
block_list,
rtype: 1,
uploadid: None,
content_md5: None,
slice_md5: None,
local_ctime: None,
local_mtime: None,
}
}
pub fn isdir(mut self, isdir: bool) -> Self {
self.isdir = if isdir { 1 } else { 0 };
self
}
pub fn rtype(mut self, rtype: i32) -> Self {
self.rtype = rtype;
self
}
pub fn uploadid(mut self, uploadid: &str) -> Self {
self.uploadid = Some(uploadid.to_string());
self
}
pub fn content_md5(mut self, md5: &str) -> Self {
self.content_md5 = Some(md5.to_string());
self
}
pub fn slice_md5(mut self, md5: &str) -> Self {
self.slice_md5 = Some(md5.to_string());
self
}
pub fn local_ctime(mut self, ctime: u64) -> Self {
self.local_ctime = Some(ctime);
self
}
pub fn local_mtime(mut self, mtime: u64) -> Self {
self.local_mtime = Some(mtime);
self
}
}
#[derive(Debug, Deserialize)]
pub struct PrecreateResponse {
pub errno: i32,
#[serde(default)]
pub path: Option<String>,
pub uploadid: String,
#[serde(rename = "return_type")]
pub return_type: i32,
#[serde(rename = "block_list")]
pub block_list: Vec<u32>,
}
#[derive(Debug, Deserialize)]
pub struct LocateUploadServer {
pub server: String,
}
#[derive(Debug, Deserialize)]
pub struct LocateUploadResponse {
#[serde(default)]
pub bak_server: Vec<String>,
#[serde(default)]
pub bak_servers: Vec<LocateUploadServer>,
pub client_ip: String,
pub error_code: i32,
pub error_msg: String,
pub expire: i32,
pub host: String,
#[serde(default)]
pub newno: String,
#[serde(default)]
pub quic_server: Vec<String>,
#[serde(default)]
pub quic_servers: Vec<LocateUploadServer>,
pub request_id: u64,
#[serde(default)]
pub server: Vec<String>,
pub server_time: u64,
pub servers: Vec<LocateUploadServer>,
pub sl: i32,
}
impl LocateUploadResponse {
pub fn get_https_servers(&self) -> Vec<String> {
self.servers
.iter()
.filter(|s| s.server.starts_with("https://"))
.map(|s| s.server.clone())
.collect()
}
pub fn get_first_https_server(&self) -> Option<String> {
let https_servers = self.get_https_servers();
if https_servers.is_empty() {
None
} else {
Some(https_servers[0].clone())
}
}
}
impl UploadClient {
pub async fn locate_upload(
&self,
path: &str,
uploadid: &str,
) -> NetDiskResult<LocateUploadResponse> {
let token = self.token_getter.get_token().await?;
let url = format!(
"https://d.pcs.baidu.com/rest/2.0/pcs/file?method=locateupload&appid=250528&access_token={}&path={}&uploadid={}&upload_version=2.0",
urlencoding::encode(&token.access_token),
urlencoding::encode(path),
urlencoding::encode(uploadid)
);
debug!("Locate upload: path={}, uploadid={}", path, uploadid);
let response: LocateUploadResponse = self.http_client.get(&url, None).await?;
if response.error_code != 0 {
return Err(NetDiskError::api_error(
response.error_code,
&response.error_msg,
));
}
debug!(
"Locate upload success: host={}, servers={}",
response.host,
response.servers.len()
);
Ok(response)
}
pub async fn upload_chunk(
&self,
options: UploadChunkOptions,
server_url: Option<&str>,
) -> NetDiskResult<UploadChunkResponse> {
let token = self.token_getter.get_token().await?;
let server = server_url.unwrap_or("https://c3.pcs.baidu.com");
let url = format!(
"{}/rest/2.0/pcs/superfile2?method=upload&access_token={}&type=tmpfile&path={}&uploadid={}&partseq={}",
server,
urlencoding::encode(&token.access_token),
urlencoding::encode(&options.path),
urlencoding::encode(&options.uploadid),
options.partseq
);
debug!(
"Upload chunk: path={}, uploadid={}, partseq={}, data_size={}, server={}",
options.path,
options.uploadid,
options.partseq,
options.data.len(),
server
);
let response: UploadChunkResponse = self
.http_client
.post_multipart(
&url,
"file".to_string(),
"chunk.dat".to_string(),
options.data,
)
.await?;
Ok(response)
}
pub async fn upload_chunks_parallel(
&self,
remote_path: &str,
uploadid: &str,
chunks: Vec<(u32, Vec<u8>)>,
max_concurrency: usize,
server_url: Option<&str>,
) -> NetDiskResult<Vec<(u32, String)>> {
let token = self.token_getter.get_token().await?;
let server = server_url.unwrap_or("https://c3.pcs.baidu.com").to_string();
debug!(
"Uploading {} chunks in parallel (max_concurrency: {}, server: {})",
chunks.len(),
max_concurrency,
server
);
let access_token_str = token.access_token;
let remote_path_str = remote_path.to_string();
let uploadid_str = uploadid.to_string();
let http_client = self.http_client.clone();
let mut stream = stream::iter(chunks)
.map(move |(partseq, data)| {
let path = remote_path_str.clone();
let uid = access_token_str.clone();
let upid = uploadid_str.clone();
let client = http_client.clone();
let server_clone = server.clone();
async move {
let url = format!(
"{}/rest/2.0/pcs/superfile2?method=upload&access_token={}&type=tmpfile&path={}&uploadid={}&partseq={}",
server_clone,
urlencoding::encode(&uid),
urlencoding::encode(&path),
urlencoding::encode(&upid),
partseq
);
debug!("Uploading chunk {} ({} bytes)", partseq, data.len());
let response: UploadChunkResponse = client
.post_multipart(&url, "file".to_string(), "chunk.dat".to_string(), data)
.await?;
Ok((partseq, response.md5))
}
})
.buffer_unordered(max_concurrency);
let mut chunk_md5s = Vec::new();
while let Some(result) = stream.next().await {
match result {
Ok((partseq, md5)) => {
chunk_md5s.push((partseq, md5));
}
Err(e) => {
return Err(e);
}
}
}
debug!("All chunks uploaded successfully");
Ok(chunk_md5s)
}
pub async fn create_file(
&self,
options: CreateFileOptions,
) -> NetDiskResult<CreateFileResponse> {
let token = self.token_getter.get_token().await?;
let block_list_json =
serde_json::to_string(&options.block_list).map_err(|e| NetDiskError::Unknown {
message: format!("Failed to serialize block_list: {}", e),
})?;
let params = vec![
("method", "create"),
("access_token", token.access_token.as_str()),
];
let size_str = options.size.to_string();
let isdir_str = options.isdir.to_string();
let rtype_str = options.rtype.to_string();
let mut form_data = vec![
("path", options.path.as_str()),
("size", size_str.as_str()),
("isdir", isdir_str.as_str()),
("block_list", block_list_json.as_str()),
("uploadid", options.uploadid.as_str()),
("rtype", rtype_str.as_str()),
];
let ctime_str = options.local_ctime.map(|t| t.to_string());
let mtime_str = options.local_mtime.map(|t| t.to_string());
if let Some(ref ctime) = ctime_str {
form_data.push(("local_ctime", ctime.as_str()));
}
if let Some(ref mtime) = mtime_str {
form_data.push(("local_mtime", mtime.as_str()));
}
debug!(
"Create file: path={}, size={}, isdir={}, uploadid={}",
options.path, options.size, options.isdir, options.uploadid
);
let response: CreateFileResponse = self
.http_client
.post_form("/rest/2.0/xpan/file", Some(&form_data), Some(¶ms))
.await?;
if response.errno != 0 {
let error_msg = get_create_error_message(response.errno);
return Err(NetDiskError::api_error(response.errno, &error_msg));
}
Ok(response)
}
}
#[derive(Debug, Clone)]
pub struct UploadChunkOptions {
pub path: String,
pub uploadid: String,
pub partseq: u32,
pub data: Vec<u8>,
}
impl UploadChunkOptions {
pub fn new(path: &str, uploadid: &str, partseq: u32, data: Vec<u8>) -> Self {
Self {
path: path.to_string(),
uploadid: uploadid.to_string(),
partseq,
data,
}
}
}
#[derive(Debug, Deserialize)]
pub struct UploadChunkResponse {
pub md5: String,
}
#[derive(Debug, Clone, Default)]
pub struct CreateFileOptions {
pub path: String,
pub size: u64,
pub isdir: i32,
pub block_list: Vec<String>,
pub uploadid: String,
pub rtype: i32,
pub local_ctime: Option<u64>,
pub local_mtime: Option<u64>,
}
impl CreateFileOptions {
pub fn new(path: &str, size: u64, block_list: Vec<String>, uploadid: &str) -> Self {
Self {
path: path.to_string(),
size,
isdir: 0,
block_list,
uploadid: uploadid.to_string(),
rtype: 1,
local_ctime: None,
local_mtime: None,
}
}
pub fn isdir(mut self, isdir: bool) -> Self {
self.isdir = if isdir { 1 } else { 0 };
self
}
pub fn rtype(mut self, rtype: i32) -> Self {
self.rtype = rtype;
self
}
pub fn local_ctime(mut self, ctime: u64) -> Self {
self.local_ctime = Some(ctime);
self
}
pub fn local_mtime(mut self, mtime: u64) -> Self {
self.local_mtime = Some(mtime);
self
}
}
#[derive(Debug, Deserialize)]
pub struct CreateFileResponse {
pub errno: i32,
#[serde(rename = "fs_id")]
pub fs_id: u64,
pub md5: Option<String>,
#[serde(rename = "server_filename")]
#[serde(default)]
pub server_filename: Option<String>,
pub category: i32,
pub path: String,
pub size: u64,
pub ctime: u64,
pub mtime: u64,
pub isdir: i32,
#[serde(default)]
pub name: Option<String>,
#[serde(rename = "from_type")]
#[serde(default)]
pub from_type: Option<i32>,
}
fn get_create_error_message(errno: i32) -> String {
match errno {
-7 => "File or directory name error or access denied".to_string(),
-8 => "File or directory already exists".to_string(),
-10 => "Cloud storage capacity full".to_string(),
10 => "Failed to create file".to_string(),
31190 => "File not found".to_string(),
31355 => "Invalid parameter".to_string(),
31365 => "Total file size limit exceeded".to_string(),
_ => format!("Unknown error: {}", errno),
}
}
fn get_error_message(errno: i32) -> String {
match errno {
-7 => "File or directory name error or access denied".to_string(),
-10 => "Insufficient capacity".to_string(),
_ => format!("Unknown error: {}", errno),
}
}
const DEFAULT_CHUNK_SIZE: usize = 4 * 1024 * 1024;
const DEFAULT_MAX_CONCURRENCY: usize = 10;
#[derive(Debug, Clone)]
pub struct SimpleUploadOptions {
pub chunk_size: usize,
pub max_concurrency: usize,
pub r#type: i32,
}
impl Default for SimpleUploadOptions {
fn default() -> Self {
Self {
chunk_size: DEFAULT_CHUNK_SIZE,
max_concurrency: DEFAULT_MAX_CONCURRENCY,
r#type: 1,
}
}
}
impl SimpleUploadOptions {
pub fn new() -> Self {
Self::default()
}
pub fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn max_concurrency(mut self, concurrency: usize) -> Self {
self.max_concurrency = concurrency;
self
}
pub fn r#type(mut self, r#type: i32) -> Self {
self.r#type = r#type;
self
}
}
impl UploadClient {
pub async fn upload_file<P: AsRef<std::path::Path>>(
&self,
local_path: P,
remote_path: &str,
) -> NetDiskResult<CreateFileResponse> {
self.upload_file_with_options(local_path, remote_path, SimpleUploadOptions::default())
.await
}
pub async fn upload_file_with_options<P: AsRef<std::path::Path>>(
&self,
local_path: P,
remote_path: &str,
options: SimpleUploadOptions,
) -> NetDiskResult<CreateFileResponse> {
let file = std::fs::File::open(&local_path).map_err(|e| NetDiskError::Unknown {
message: format!(
"Failed to open file {}: {}",
local_path.as_ref().display(),
e
),
})?;
let metadata = file.metadata().map_err(|e| NetDiskError::Unknown {
message: format!(
"Failed to get file metadata {}: {}",
local_path.as_ref().display(),
e
),
})?;
let file_size = metadata.len();
debug!("File opened successfully: {} bytes", file_size);
let mut reader = std::io::BufReader::new(file);
self.upload_reader_with_options(&mut reader, file_size, remote_path, options)
.await
}
pub async fn upload_reader<R: std::io::Read + std::io::Seek>(
&self,
reader: &mut R,
file_size: u64,
remote_path: &str,
) -> NetDiskResult<CreateFileResponse> {
self.upload_reader_with_options(
reader,
file_size,
remote_path,
SimpleUploadOptions::default(),
)
.await
}
pub async fn upload_reader_with_options<R: std::io::Read + std::io::Seek>(
&self,
reader: &mut R,
file_size: u64,
remote_path: &str,
options: SimpleUploadOptions,
) -> NetDiskResult<CreateFileResponse> {
let chunk_size = options.chunk_size;
let max_concurrency = options.max_concurrency;
let r#type = options.r#type;
debug!(
"upload_reader: file_size={} bytes, chunk_size={}",
file_size, chunk_size
);
let mut block_list: Vec<String> = Vec::new();
let mut read_chunks = 0usize;
loop {
let mut buffer = vec![0u8; chunk_size];
let bytes_read = match reader.read(&mut buffer) {
Ok(n) => n,
Err(e) => {
debug!("First pass read error: {}", e);
break;
}
};
if bytes_read == 0 {
break;
}
buffer.truncate(bytes_read);
let chunk_md5 = format!("{:x}", md5::compute(&buffer));
block_list.push(chunk_md5);
read_chunks += 1;
}
debug!("First pass: read {} chunks", read_chunks);
reader.rewind().map_err(|e| NetDiskError::Unknown {
message: format!("Failed to rewind reader: {}", e),
})?;
let precreate_options =
PrecreateOptions::new(remote_path, file_size, block_list.clone()).rtype(r#type);
let precreate_response = self.precreate(precreate_options).await?;
let missing_blocks: Vec<u32> = precreate_response.block_list;
debug!(
"Server returned {} blocks need upload",
missing_blocks.len()
);
if !missing_blocks.is_empty() {
let locate_response = self
.locate_upload(remote_path, &precreate_response.uploadid)
.await?;
let upload_server = locate_response.get_first_https_server();
debug!("Located upload server: {:?}", upload_server);
let missing_blocks_set: std::collections::HashSet<u32> =
missing_blocks.into_iter().collect();
let batch_size = max_concurrency * 2;
let mut pending_chunks: Vec<(u32, Vec<u8>)> = Vec::with_capacity(batch_size);
let mut all_chunk_md5s: Vec<(u32, String)> = Vec::new();
let mut partseq = 0u32;
loop {
let mut buffer = vec![0u8; chunk_size];
let bytes_read = match reader.read(&mut buffer) {
Ok(n) => n,
Err(e) => {
debug!("Second pass read error: {}", e);
break;
}
};
if bytes_read == 0 {
break;
}
buffer.truncate(bytes_read);
let chunk_md5 = format!("{:x}", md5::compute(&buffer));
if missing_blocks_set.contains(&partseq) {
pending_chunks.push((partseq, buffer));
} else {
all_chunk_md5s.push((partseq, chunk_md5.clone()));
}
if pending_chunks.len() >= batch_size
|| (partseq + 1 == read_chunks as u32 && !pending_chunks.is_empty())
{
let batch_results = self
.upload_chunks_parallel(
remote_path,
&precreate_response.uploadid,
std::mem::take(&mut pending_chunks),
max_concurrency,
upload_server.as_deref(),
)
.await?;
for (seq, md5) in batch_results {
all_chunk_md5s.push((seq, md5));
}
}
partseq += 1;
}
all_chunk_md5s.sort_by_key(|(i, _)| *i);
let new_block_list: Vec<String> =
all_chunk_md5s.into_iter().map(|(_, md5)| md5).collect();
let create_options = CreateFileOptions::new(
remote_path,
file_size,
new_block_list,
&precreate_response.uploadid,
)
.rtype(r#type);
self.create_file(create_options).await
} else {
let create_options = CreateFileOptions::new(
remote_path,
file_size,
block_list,
&precreate_response.uploadid,
)
.rtype(r#type);
self.create_file(create_options).await
}
}
pub async fn upload_bytes(
&self,
data: &[u8],
remote_path: &str,
) -> NetDiskResult<CreateFileResponse> {
self.upload_bytes_with_options(data, remote_path, SimpleUploadOptions::default())
.await
}
pub async fn upload_bytes_with_options(
&self,
data: &[u8],
remote_path: &str,
options: SimpleUploadOptions,
) -> NetDiskResult<CreateFileResponse> {
let file_size = data.len() as u64;
let chunk_size = options.chunk_size;
let max_concurrency = options.max_concurrency;
let r#type = options.r#type;
let block_list: Vec<String> = data
.chunks(chunk_size)
.map(|chunk| format!("{:x}", md5::compute(chunk)))
.collect();
let precreate_options =
PrecreateOptions::new(remote_path, file_size, block_list.clone()).rtype(r#type);
let precreate_response = self.precreate(precreate_options).await?;
let missing_blocks_set: std::collections::HashSet<u32> =
precreate_response.block_list.into_iter().collect();
let chunks_to_upload: Vec<(u32, Vec<u8>)> = data
.chunks(chunk_size)
.enumerate()
.filter(|(i, _)| missing_blocks_set.contains(&(*i as u32)))
.map(|(i, chunk)| (i as u32, chunk.to_vec()))
.collect();
if !chunks_to_upload.is_empty() {
let locate_response = self
.locate_upload(remote_path, &precreate_response.uploadid)
.await?;
let upload_server = locate_response.get_first_https_server();
debug!("Located upload server: {:?}", upload_server);
let chunk_results = self
.upload_chunks_parallel(
remote_path,
&precreate_response.uploadid,
chunks_to_upload,
max_concurrency,
upload_server.as_deref(),
)
.await?;
let mut sorted_results = chunk_results;
sorted_results.sort_by_key(|(i, _)| *i);
let new_block_list: Vec<String> =
sorted_results.into_iter().map(|(_, md5)| md5).collect();
let create_options = CreateFileOptions::new(
remote_path,
file_size,
new_block_list,
&precreate_response.uploadid,
)
.rtype(r#type);
self.create_file(create_options).await
} else {
let create_options = CreateFileOptions::new(
remote_path,
file_size,
block_list,
&precreate_response.uploadid,
)
.rtype(r#type);
self.create_file(create_options).await
}
}
}