use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::StreamExt;
use crate::QuarkPanInner;
use crate::error::{QuarkPanError, Result};
use crate::model::{
UpAuthAndCommitRequest, UpPartMethodRequest, UploadComplete, UploadPrepareResult, UploadResume,
UploadResumeState, UploadSession,
};
pub struct UploadBuilder {
inner: Arc<QuarkPanInner>,
pdir_fid: String,
file_name: Option<String>,
size: Option<u64>,
md5: Option<String>,
sha1: Option<String>,
}
impl UploadBuilder {
pub(crate) fn new(inner: Arc<QuarkPanInner>) -> Self {
Self {
inner,
pdir_fid: "0".to_string(),
file_name: None,
size: None,
md5: None,
sha1: None,
}
}
pub fn pdir_fid(mut self, pdir_fid: impl Into<String>) -> Self {
self.pdir_fid = pdir_fid.into();
self
}
pub fn file_name(mut self, file_name: impl Into<String>) -> Self {
self.file_name = Some(file_name.into());
self
}
pub fn size(mut self, size: u64) -> Self {
self.size = Some(size);
self
}
pub fn md5(mut self, md5: impl Into<String>) -> Self {
self.md5 = Some(md5.into());
self
}
pub fn sha1(mut self, sha1: impl Into<String>) -> Self {
self.sha1 = Some(sha1.into());
self
}
pub async fn prepare(self) -> Result<UploadPrepareResult> {
let file_name = self
.file_name
.ok_or_else(|| QuarkPanError::missing_field("file_name"))?;
let size = self
.size
.ok_or_else(|| QuarkPanError::missing_field("size"))?;
let md5 = self
.md5
.ok_or_else(|| QuarkPanError::missing_field("md5"))?;
let sha1 = self
.sha1
.ok_or_else(|| QuarkPanError::missing_field("sha1"))?;
let pre = self
.inner
.api
.up_pre(&file_name, size, &self.pdir_fid)
.await?;
if pre.data.finish {
return Ok(UploadPrepareResult::RapidUploaded { fid: pre.data.fid });
}
let task_id = pre.data.task_id.clone();
let hash = self.inner.api.up_hash(&md5, &sha1, &task_id).await?;
if hash.data.finish {
return Ok(UploadPrepareResult::RapidUploaded { fid: pre.data.fid });
}
let upload_id = pre.data.upload_id.ok_or_else(|| {
QuarkPanError::invalid_argument("missing upload_id in prepare response")
})?;
Ok(UploadPrepareResult::NeedUpload(UploadSession {
api: self.inner.api.clone(),
fid: pre.data.fid,
size,
mime_type: if pre.data.format_type.is_empty() {
"application/octet-stream".to_string()
} else {
pre.data.format_type
},
part_size: pre.metadata.part_size,
auth_info: pre.data.auth_info,
callback: pre.data.callback,
bucket: pre.data.bucket,
obj_key: pre.data.obj_key,
upload_id,
upload_url: pre
.data
.upload_url
.trim_start_matches("https://")
.trim_start_matches("http://")
.to_string(),
task_id,
}))
}
}
impl UploadBuilder {
pub fn resume(self, resume: UploadResume) -> UploadSession {
UploadSession {
api: self.inner.api.clone(),
fid: resume.fid,
size: resume.size,
mime_type: resume.mime_type,
part_size: resume.part_size,
auth_info: resume.auth_info,
callback: resume.callback,
bucket: resume.bucket,
obj_key: resume.obj_key,
upload_id: resume.upload_id,
upload_url: resume.upload_url,
task_id: resume.task_id,
}
}
}
pub(crate) async fn upload_stream<S, E>(session: UploadSession, stream: S) -> Result<UploadComplete>
where
S: Stream<Item = std::result::Result<Bytes, E>> + Send + 'static,
E: Into<QuarkPanError>,
{
upload_stream_resumable(session, stream, UploadResumeState::default(), |_state| {
Ok(())
})
.await
}
pub(crate) async fn upload_stream_resumable<S, E, F>(
session: UploadSession,
stream: S,
mut state: UploadResumeState,
mut on_part_uploaded: F,
) -> Result<UploadComplete>
where
S: Stream<Item = std::result::Result<Bytes, E>> + Send + 'static,
E: Into<QuarkPanError>,
F: FnMut(&UploadResumeState) -> Result<()> + Send + 'static,
{
let mut stream = Box::pin(stream);
let part_size = session.part_size as usize;
if part_size == 0 {
return Err(QuarkPanError::invalid_argument(
"part_size must be greater than 0",
));
}
let mut buffer = bytes::BytesMut::new();
let mut sent: u64 = ((state.next_part_number.saturating_sub(1)) as u64) * session.part_size;
let mut part_number: u32 = if state.next_part_number == 0 {
1
} else {
state.next_part_number
};
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(Into::into)?;
sent += chunk.len() as u64;
if sent > session.size {
return Err(QuarkPanError::invalid_argument(
"stream produced more bytes than declared size",
));
}
buffer.extend_from_slice(&chunk);
while buffer.len() >= part_size {
let bytes = buffer.split_to(part_size).freeze();
let etag = upload_part(&session, part_number, bytes).await?;
state.part_etags.push(etag);
part_number += 1;
state.next_part_number = part_number;
on_part_uploaded(&state)?;
}
}
if !buffer.is_empty() {
let etag = upload_part(&session, part_number, buffer.freeze()).await?;
state.part_etags.push(etag);
part_number += 1;
state.next_part_number = part_number;
on_part_uploaded(&state)?;
}
if sent != session.size {
return Err(QuarkPanError::invalid_argument(format!(
"stream size mismatch: declared {}, actual {}",
session.size, sent
)));
}
let commit_request = UpAuthAndCommitRequest {
md5s: state.part_etags,
callback: session.callback.clone(),
bucket: session.bucket.clone(),
obj_key: session.obj_key.clone(),
upload_id: session.upload_id.clone(),
auth_info: session.auth_info.clone(),
task_id: session.task_id.clone(),
upload_url: session.upload_url.clone(),
};
retry_async(3, || session.api.up_auth_and_commit(commit_request.clone())).await?;
retry_async(3, || session.api.finish(&session.obj_key, &session.task_id)).await?;
Ok(UploadComplete {
fid: session.fid,
rapid_upload: false,
})
}
async fn upload_part(session: &UploadSession, part_number: u32, bytes: Bytes) -> Result<String> {
let mut delay = Duration::from_secs(1);
let mut last_err = None;
for _attempt in 0..3 {
let utc_time = chrono::Utc::now()
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string();
let auth_meta = session.api.up_part_auth_meta(
&session.mime_type,
&utc_time,
&session.bucket,
&session.obj_key,
part_number,
&session.upload_id,
);
match session
.api
.auth(&session.auth_info, &auth_meta, &session.task_id)
.await
{
Ok(auth) => match session
.api
.up_part(UpPartMethodRequest {
auth_key: auth.data.auth_key,
mime_type: session.mime_type.clone(),
utc_time,
bucket: session.bucket.clone(),
upload_url: session.upload_url.clone(),
obj_key: session.obj_key.clone(),
part_number,
upload_id: session.upload_id.clone(),
part_bytes: bytes.clone(),
})
.await
{
Ok(etag) => return Ok(etag),
Err(err) => last_err = Some(err),
},
Err(err) => last_err = Some(err),
}
tokio::time::sleep(delay).await;
delay = delay.saturating_mul(2);
}
Err(last_err.unwrap_or_else(|| QuarkPanError::invalid_argument("upload part failed")))
}
async fn retry_async<T, Fut, F>(attempts: u32, mut f: F) -> Result<T>
where
Fut: std::future::Future<Output = Result<T>>,
F: FnMut() -> Fut,
{
let mut delay = Duration::from_secs(1);
let mut last_err = None;
for idx in 0..attempts {
match f().await {
Ok(value) => return Ok(value),
Err(err) => {
last_err = Some(err);
if idx + 1 < attempts {
tokio::time::sleep(delay).await;
delay = delay.saturating_mul(2);
}
}
}
}
Err(last_err.unwrap_or_else(|| QuarkPanError::invalid_argument("retry failed")))
}