use crate::api::files::{
upload_session_append::UploadSessionAppendRequest,
upload_session_finish::UploadSessionFinishRequest,
upload_session_start::UploadSessionStartRequest, CommitInfo, FileMetadata,
UploadSessionAppendArg, UploadSessionCursor, UploadSessionFinishArg, UploadSessionStartArg,
};
use crate::api::Service;
use anyhow::{Context, Result};
use tokio::io::{AsyncRead, AsyncReadExt};
pub const DEFAULT_CHUNK_SIZE: usize = 4 * 1024 * 1024;
pub async fn upload_large_file<R: AsyncRead + Unpin>(
token: &str,
path: &str,
mut reader: R,
chunk_size: usize,
mode: crate::api::files::WriteMode,
) -> Result<FileMetadata> {
let mut first_chunk = vec![0u8; chunk_size];
let mut first_read = 0usize;
while first_read < chunk_size {
let n = reader.read(&mut first_chunk[first_read..]).await?;
if n == 0 {
break;
}
first_read += n;
}
first_chunk.truncate(first_read);
let eof_after_first = first_read < chunk_size;
let start_req = UploadSessionStartRequest {
access_token: token,
payload: Some(UploadSessionStartArg {
close: Some(eof_after_first),
session_type: None,
content_hash: None,
}),
data: Some(first_chunk),
};
let start_resp = start_req
.call()
.await?
.context("upload_session/start returned empty")?;
let session_id = start_resp.payload.session_id;
let mut offset = first_read as u64;
if !eof_after_first {
loop {
let mut buf = vec![0u8; chunk_size];
let mut read = 0usize;
while read < chunk_size {
let n = reader.read(&mut buf[read..]).await?;
if n == 0 {
break;
}
read += n;
}
buf.truncate(read);
if read == chunk_size {
let append_req = UploadSessionAppendRequest {
access_token: token,
payload: Some(UploadSessionAppendArg {
cursor: UploadSessionCursor {
session_id: session_id.clone(),
offset,
},
close: Some(false),
content_hash: None,
}),
data: Some(buf),
};
let _ = append_req.call().await?;
offset += read as u64;
} else {
let finish_req = UploadSessionFinishRequest {
access_token: token,
payload: Some(UploadSessionFinishArg {
cursor: UploadSessionCursor {
session_id: session_id.clone(),
offset,
},
commit: CommitInfo {
path: path.to_string(),
mode: mode.clone(),
autorename: true,
client_modified: None,
mute: false,
property_groups: None,
strict_conflict: None,
},
content_hash: None,
}),
data: Some(buf),
};
let resp = finish_req
.call()
.await?
.context("upload_session/finish returned empty")?;
return Ok(resp.payload);
}
}
}
let finish_req = UploadSessionFinishRequest {
access_token: token,
payload: Some(UploadSessionFinishArg {
cursor: UploadSessionCursor { session_id, offset },
commit: CommitInfo {
path: path.to_string(),
mode,
autorename: true,
client_modified: None,
mute: false,
property_groups: None,
strict_conflict: None,
},
content_hash: None,
}),
data: Some(Vec::new()),
};
let resp = finish_req
.call()
.await?
.context("upload_session/finish returned empty")?;
Ok(resp.payload)
}
#[cfg(all(test, feature = "test-utils"))]
mod tests {
use super::upload_large_file;
use crate::api::files::WriteMode;
use crate::tests_utils::with_test_server_async;
use std::io::Cursor;
#[tokio::test]
async fn single_chunk_file_does_start_then_finish() {
let start_resp = r#"{"session_id":"session-1"}"#;
let finish_resp = r#"{"name":"hi.txt","id":"id:abc","client_modified":"2025-01-01T00:00:00Z","server_modified":"2025-01-01T00:00:00Z","rev":"r1","size":5,"path_lower":"/hi.txt","path_display":"/hi.txt","is_downloadable":true}"#;
with_test_server_async(|mut server| async move {
let start_mock = server
.mock("POST", "/2/files/upload_session/start")
.with_status(200)
.with_header("Content-Type", "application/json")
.with_body(start_resp)
.create_async()
.await;
let finish_mock = server
.mock("POST", "/2/files/upload_session/finish")
.with_status(200)
.with_header("Content-Type", "application/json")
.with_body(finish_resp)
.create_async()
.await;
let reader = Cursor::new(b"hello".to_vec());
let meta = upload_large_file("test", "/hi.txt", reader, 4096, WriteMode::Add)
.await
.expect("upload_large_file returned error");
assert_eq!(meta.name, "hi.txt");
assert_eq!(meta.size, 5);
start_mock.assert();
finish_mock.assert();
})
.await;
}
}