use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, thiserror::Error)]
pub enum GcsError {
#[error("upload aborted")]
Aborted,
#[error("already finalized")]
AlreadyFinalized,
#[error("chunk out of range: offset {offset} but server has {server_bytes} bytes")]
ChunkOutOfRange {
offset: usize,
server_bytes: usize,
},
#[error("size mismatch: declared {declared}, uploaded {uploaded}")]
SizeMismatch {
declared: usize,
uploaded: usize,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UploadStatus {
Incomplete {
bytes_received: usize,
},
Complete {
total_bytes: usize,
},
}
pub struct GcsResumableUpload {
upload_id: String,
object_name: String,
bucket: String,
chunks: Vec<Vec<u8>>,
total_size: Option<usize>,
finalized: bool,
aborted: bool,
}
impl GcsResumableUpload {
pub fn initiate(bucket: &str, object_name: &str, total_size: Option<usize>) -> Self {
Self {
upload_id: generate_session_id(),
object_name: object_name.to_owned(),
bucket: bucket.to_owned(),
chunks: Vec::new(),
total_size,
finalized: false,
aborted: false,
}
}
pub fn upload_chunk(&mut self, offset: usize, data: &[u8]) -> Result<UploadStatus, GcsError> {
self.guard_active()?;
let server_bytes = self.query_status();
if offset != server_bytes {
return Err(GcsError::ChunkOutOfRange {
offset,
server_bytes,
});
}
if !data.is_empty() {
self.chunks.push(data.to_vec());
}
let bytes_received = self.query_status();
match self.total_size {
Some(total) if bytes_received >= total => Ok(UploadStatus::Complete {
total_bytes: bytes_received,
}),
_ => Ok(UploadStatus::Incomplete { bytes_received }),
}
}
pub fn finalize(&mut self) -> Result<usize, GcsError> {
self.guard_active()?;
let uploaded = self.query_status();
if let Some(declared) = self.total_size {
if uploaded != declared {
return Err(GcsError::SizeMismatch { declared, uploaded });
}
}
self.finalized = true;
Ok(uploaded)
}
pub fn abort(&mut self) -> Result<(), GcsError> {
if self.finalized {
return Err(GcsError::AlreadyFinalized);
}
self.chunks.clear();
self.aborted = true;
Ok(())
}
pub fn query_status(&self) -> usize {
self.chunks.iter().map(|c| c.len()).sum()
}
pub fn assembled_data(&self) -> Vec<u8> {
let total = self.query_status();
let mut out = Vec::with_capacity(total);
for chunk in &self.chunks {
out.extend_from_slice(chunk);
}
out
}
pub fn upload_id(&self) -> &str {
&self.upload_id
}
pub fn bucket(&self) -> &str {
&self.bucket
}
pub fn object_name(&self) -> &str {
&self.object_name
}
pub fn is_finalized(&self) -> bool {
self.finalized
}
pub fn is_aborted(&self) -> bool {
self.aborted
}
fn guard_active(&self) -> Result<(), GcsError> {
if self.aborted {
return Err(GcsError::Aborted);
}
if self.finalized {
return Err(GcsError::AlreadyFinalized);
}
Ok(())
}
}
fn generate_session_id() -> String {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let count = COUNTER.fetch_add(1, Ordering::Relaxed);
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
format!("gcs-sim-{ts:032x}{count:016x}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_chunk_roundtrip() {
let data = b"hello, gcs!";
let mut upload =
GcsResumableUpload::initiate("my-bucket", "obj/data.bin", Some(data.len()));
assert!(!upload.upload_id().is_empty());
assert_eq!(upload.bucket(), "my-bucket");
assert_eq!(upload.object_name(), "obj/data.bin");
let status = upload.upload_chunk(0, data).expect("upload_chunk");
assert_eq!(
status,
UploadStatus::Complete {
total_bytes: data.len()
}
);
let total = upload.finalize().expect("finalize");
assert_eq!(total, data.len());
assert_eq!(upload.assembled_data(), data);
assert!(upload.is_finalized());
}
#[test]
fn test_size_mismatch_on_finalize() {
let mut upload = GcsResumableUpload::initiate("b", "k", Some(100));
upload
.upload_chunk(0, b"only 9 bytes")
.expect("upload chunk");
let err = upload.finalize().expect_err("should fail");
assert!(matches!(err, GcsError::SizeMismatch { declared: 100, .. }));
}
#[test]
fn test_operations_after_abort_fail() {
let mut upload = GcsResumableUpload::initiate("b", "k", None);
upload.upload_chunk(0, b"some data").expect("first chunk");
upload.abort().expect("abort");
assert!(upload.is_aborted());
let err_chunk = upload.upload_chunk(9, b"more").expect_err("should fail");
assert!(matches!(err_chunk, GcsError::Aborted));
let err_finalize = upload.finalize().expect_err("should fail");
assert!(matches!(err_finalize, GcsError::Aborted));
}
#[test]
fn test_operations_after_finalize_fail() {
let data = b"finalized";
let mut upload = GcsResumableUpload::initiate("b", "k", Some(data.len()));
upload.upload_chunk(0, data).expect("upload");
upload.finalize().expect("first finalize");
let err_upload = upload
.upload_chunk(data.len(), b"x")
.expect_err("should fail");
assert!(matches!(err_upload, GcsError::AlreadyFinalized));
let err_finalize = upload.finalize().expect_err("should fail");
assert!(matches!(err_finalize, GcsError::AlreadyFinalized));
let err_abort = upload
.abort()
.expect_err("abort after finalize should fail");
assert!(matches!(err_abort, GcsError::AlreadyFinalized));
}
#[test]
fn test_query_status_tracks_bytes() {
let mut upload = GcsResumableUpload::initiate("b", "k", None);
assert_eq!(upload.query_status(), 0);
upload.upload_chunk(0, b"abc").expect("chunk 1");
assert_eq!(upload.query_status(), 3);
upload.upload_chunk(3, b"defgh").expect("chunk 2");
assert_eq!(upload.query_status(), 8);
}
#[test]
fn test_two_chunks_assemble_in_order() {
let mut upload = GcsResumableUpload::initiate("b", "k", None);
let part1 = b"FIRST_";
let part2 = b"SECOND";
upload.upload_chunk(0, part1).expect("chunk 1");
upload.upload_chunk(part1.len(), part2).expect("chunk 2");
upload.finalize().expect("finalize");
let assembled = upload.assembled_data();
let expected: Vec<u8> = [part1.as_ref(), part2.as_ref()].concat();
assert_eq!(assembled, expected);
}
#[test]
fn test_wrong_offset_returns_chunk_out_of_range() {
let mut upload = GcsResumableUpload::initiate("b", "k", None);
upload.upload_chunk(0, b"first").expect("chunk 1");
let err = upload.upload_chunk(99, b"second").expect_err("should fail");
assert!(matches!(
err,
GcsError::ChunkOutOfRange {
offset: 99,
server_bytes: 5
}
));
}
#[test]
fn test_unknown_total_size_finalize() {
let mut upload = GcsResumableUpload::initiate("b", "k", None);
upload.upload_chunk(0, b"anything").expect("chunk");
let total = upload.finalize().expect("finalize");
assert_eq!(total, 8);
}
#[test]
fn test_incomplete_status_until_last_chunk() {
let total_size = 10usize;
let mut upload = GcsResumableUpload::initiate("b", "k", Some(total_size));
let s1 = upload.upload_chunk(0, b"hello").expect("chunk 1");
assert_eq!(s1, UploadStatus::Incomplete { bytes_received: 5 });
let s2 = upload.upload_chunk(5, b"world").expect("chunk 2");
assert_eq!(s2, UploadStatus::Complete { total_bytes: 10 });
}
}