use std::fs::File;
use std::io;
use std::sync::Arc;
use bytes::Bytes;
use super::ObjectStoreError;
use super::error::other_boxed;
pub(crate) const MULTIPART_PUT_THRESHOLD: u64 = 64 * 1024 * 1024;
pub(crate) const MULTIPART_PUT_PART_SIZE: u64 = 16 * 1024 * 1024;
pub(crate) const MULTIPART_PUT_MAX_CONCURRENCY: usize = 8;
pub(crate) const S3_MAX_PARTS: u64 = 10_000;
pub(crate) const AZURE_MAX_BLOCKS: u64 = 50_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct UploadPart {
pub offset: u64,
pub length: u64,
}
pub(crate) fn should_use_multipart(size: u64) -> bool {
size >= MULTIPART_PUT_THRESHOLD
}
pub(crate) fn plan_upload_parts(
size: u64,
target_part_size: u64,
max_parts: u64,
) -> Vec<UploadPart> {
if size == 0 || target_part_size == 0 || max_parts == 0 {
return Vec::new();
}
let part_size = scale_part_size(size, target_part_size, max_parts);
let full_parts = size / part_size;
let last_part = size % part_size;
let part_count = usize::try_from(full_parts).unwrap_or(usize::MAX) + usize::from(last_part > 0);
let mut parts = Vec::with_capacity(part_count);
for i in 0..full_parts {
parts.push(UploadPart {
offset: i * part_size,
length: part_size,
});
}
if last_part > 0 {
parts.push(UploadPart {
offset: full_parts * part_size,
length: last_part,
});
}
parts
}
pub(crate) async fn read_file_part(
file: Arc<std::fs::File>,
part: UploadPart,
) -> Result<Bytes, ObjectStoreError> {
let length = usize::try_from(part.length).map_err(other_boxed)?;
let buf = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
let mut buf = vec![0u8; length];
pread_exact(&file, &mut buf, part.offset)?;
Ok(buf)
})
.await
.map_err(other_boxed)?
.map_err(other_boxed)?;
Ok(Bytes::from(buf))
}
#[cfg(unix)]
fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> io::Result<()> {
use std::os::unix::fs::FileExt;
file.read_exact_at(buf, offset)
}
#[cfg(windows)]
fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> io::Result<()> {
use std::os::windows::fs::FileExt;
let mut filled = 0;
while filled < buf.len() {
let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
if n == 0 {
return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
}
filled += n;
}
Ok(())
}
pub(crate) fn slice_bytes_part(body: &Bytes, part: UploadPart) -> Result<Bytes, ObjectStoreError> {
let offset = usize::try_from(part.offset).map_err(other_boxed)?;
let length = usize::try_from(part.length).map_err(other_boxed)?;
Ok(body.slice(offset..offset + length))
}
fn scale_part_size(size: u64, target_part_size: u64, max_parts: u64) -> u64 {
let mut part_size = target_part_size;
while size.div_ceil(part_size) > max_parts {
let next = part_size.checked_mul(2);
match next {
Some(n) => part_size = n,
None => return part_size,
}
}
part_size
}
#[cfg(test)]
mod tests {
use super::*;
const KIB: u64 = 1024;
const MIB: u64 = 1024 * KIB;
#[test]
fn should_use_multipart_below_threshold_is_false() {
assert!(!should_use_multipart(0));
assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
}
#[test]
fn should_use_multipart_at_or_above_threshold_is_true() {
assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
}
#[test]
fn plan_upload_parts_zero_size_is_empty() {
let parts = plan_upload_parts(0, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
assert!(parts.is_empty());
}
#[test]
fn plan_upload_parts_zero_target_part_size_is_empty() {
let parts = plan_upload_parts(MULTIPART_PUT_PART_SIZE, 0, S3_MAX_PARTS);
assert!(parts.is_empty());
}
#[test]
fn plan_upload_parts_zero_max_parts_is_empty() {
let parts = plan_upload_parts(MULTIPART_PUT_PART_SIZE, MULTIPART_PUT_PART_SIZE, 0);
assert!(parts.is_empty());
}
#[test]
fn plan_upload_parts_one_part_when_size_eq_part_size() {
let parts = plan_upload_parts(16 * MIB, 16 * MIB, S3_MAX_PARTS);
assert_eq!(
parts,
vec![UploadPart {
offset: 0,
length: 16 * MIB
}]
);
}
#[test]
fn plan_upload_parts_last_part_short() {
let parts = plan_upload_parts(16 * MIB + 1, 16 * MIB, S3_MAX_PARTS);
assert_eq!(
parts,
vec![
UploadPart {
offset: 0,
length: 16 * MIB,
},
UploadPart {
offset: 16 * MIB,
length: 1,
},
]
);
}
#[test]
fn plan_upload_parts_threshold_boundary_yields_expected_part_count() {
let parts = plan_upload_parts(
MULTIPART_PUT_THRESHOLD,
MULTIPART_PUT_PART_SIZE,
S3_MAX_PARTS,
);
assert_eq!(parts.len(), 4);
let total: u64 = parts.iter().map(|p| p.length).sum();
assert_eq!(total, MULTIPART_PUT_THRESHOLD);
for (i, p) in parts.iter().enumerate() {
assert_eq!(p.offset, (i as u64) * MULTIPART_PUT_PART_SIZE);
assert_eq!(p.length, MULTIPART_PUT_PART_SIZE);
}
}
#[test]
fn plan_upload_parts_lengths_sum_to_size() {
let cases = [
1_u64,
MULTIPART_PUT_PART_SIZE - 1,
MULTIPART_PUT_PART_SIZE,
MULTIPART_PUT_PART_SIZE + 1,
7 * MULTIPART_PUT_PART_SIZE + 17,
123 * MIB + 4567,
];
for size in cases {
let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
let total: u64 = parts.iter().map(|p| p.length).sum();
assert_eq!(total, size, "size={size}");
let mut expected_offset = 0_u64;
for p in &parts {
assert_eq!(p.offset, expected_offset, "size={size}");
assert!(p.length > 0);
expected_offset += p.length;
}
}
}
#[test]
fn plan_upload_parts_scales_part_size_when_max_parts_exceeded() {
let size = 200 * 1024 * MIB;
let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
assert!(
(parts.len() as u64) <= S3_MAX_PARTS,
"parts.len()={} > S3_MAX_PARTS={S3_MAX_PARTS}",
parts.len(),
);
let total: u64 = parts.iter().map(|p| p.length).sum();
assert_eq!(total, size);
let expected_part_size = 32 * MIB;
for p in parts.iter().take(parts.len() - 1) {
assert_eq!(p.length, expected_part_size);
}
}
#[test]
fn plan_upload_parts_azure_block_cap_well_above_s3() {
let size = 200 * 1024 * MIB;
let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
assert!((parts.len() as u64) <= AZURE_MAX_BLOCKS);
for p in parts.iter().take(parts.len() - 1) {
assert_eq!(p.length, MULTIPART_PUT_PART_SIZE);
}
}
#[tokio::test]
async fn read_file_part_propagates_eof_after_truncate() {
use std::io::Write;
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("truncated.bin");
{
let mut f = std::fs::File::create(&path).expect("create");
f.write_all(&[0u8; 4 * 1024]).expect("write");
}
let file = Arc::new(std::fs::File::open(&path).expect("open"));
std::fs::File::options()
.write(true)
.open(&path)
.expect("reopen for truncate")
.set_len(0)
.expect("truncate");
let part = UploadPart {
offset: 1024,
length: 1024,
};
let result = read_file_part(file, part).await;
assert!(
result.is_err(),
"expected read_file_part to error on truncated source, got Ok"
);
}
}