use std::{
collections::BTreeMap,
ffi::{OsStr, OsString},
fmt,
fs::File,
os::unix::prelude::{OsStrExt, OsStringExt},
path::PathBuf,
sync::Arc,
};
use anyhow::{Context, Result, bail, ensure};
use bytes::{Bytes, BytesMut};
use rustix::fs::makedev;
use tar_core::{
EntryType, HEADER_SIZE, PaxExtensions,
parse::{ParseEvent, Parser},
};
use tokio::{
io::{AsyncRead, AsyncReadExt},
sync::mpsc,
};
use composefs::{
INLINE_CONTENT_MAX_V0, dumpfile,
fsverity::FsVerityHashValue,
repository::{ObjectStoreMethod, Repository},
shared_internals::IO_BUF_CAPACITY,
splitstream::{SplitStreamBuilder, SplitStreamData, SplitStreamReader},
tree::{LeafContent, RegularFile, Stat},
};
use crate::ImportStats;
fn pax_mtime_nsec(pax: &[u8]) -> u32 {
for ext in PaxExtensions::new(pax).flatten() {
if ext.key_bytes() == b"mtime" {
let Ok(value) = ext.value() else { return 0 };
let Some(frac) = value.split_once('.').map(|(_, f)| f) else {
return 0;
};
let frac = if frac.len() >= 9 {
&frac[..9]
} else {
return frac
.parse::<u32>()
.ok()
.map_or(0, |v| v * 10u32.pow(9 - frac.len() as u32));
};
return frac.parse::<u32>().unwrap_or(0);
}
}
0
}
fn receive_and_finalize_object<ObjectID: FsVerityHashValue>(
rx: mpsc::Receiver<Bytes>,
size: u64,
repo: &Repository<ObjectID>,
) -> Result<(ObjectID, ObjectStoreMethod)> {
use std::io::Write;
let tmpfile_fd = repo.create_object_tmpfile()?;
let mut tmpfile = std::io::BufWriter::with_capacity(IO_BUF_CAPACITY, File::from(tmpfile_fd));
let mut rx = rx;
while let Some(chunk) = rx.blocking_recv() {
tmpfile.write_all(&chunk)?;
}
let tmpfile = tmpfile.into_inner()?;
repo.finalize_object_tmpfile(tmpfile, size)
}
async fn stream_large_file<ObjectID: FsVerityHashValue>(
tx: mpsc::Sender<Bytes>,
handle: tokio::task::JoinHandle<Result<(ObjectID, ObjectStoreMethod)>>,
builder: &mut SplitStreamBuilder<ObjectID>,
buf: &mut BytesMut,
tar_stream: &mut (impl AsyncRead + Unpin),
actual_size: usize,
storage_size: usize,
) -> Result<()> {
let from_buf = std::cmp::min(buf.len(), actual_size);
if from_buf > 0 && tx.send(buf.split_to(from_buf).freeze()).await.is_err() {
drop(tx);
return handle
.await?
.map(|_| ())
.context("Object write task failed");
}
let mut remaining = actual_size.checked_sub(from_buf).unwrap();
while remaining > 0 {
buf.reserve(std::cmp::min(remaining, IO_BUF_CAPACITY));
let n = tar_stream.read_buf(buf).await?;
if n == 0 {
bail!("unexpected EOF reading tar entry");
}
let chunk_size = std::cmp::min(remaining, buf.len());
if tx.send(buf.split_to(chunk_size).freeze()).await.is_err() {
drop(tx);
return handle
.await?
.map(|_| ())
.context("Object write task failed");
}
remaining = remaining.checked_sub(chunk_size).unwrap();
}
drop(tx);
builder.push_external(handle, actual_size as u64);
let padding_size = storage_size.checked_sub(actual_size).unwrap();
if padding_size > 0 {
let pad_from_buf = std::cmp::min(buf.len(), padding_size);
if pad_from_buf > 0 {
builder.push_inline(&buf.split_to(pad_from_buf));
}
let stream_padding = padding_size - pad_from_buf;
if stream_padding > 0 {
buf.reserve(stream_padding);
while buf.len() < stream_padding {
let n = tar_stream.read_buf(buf).await?;
if n == 0 {
bail!("unexpected EOF reading tar padding");
}
}
builder.push_inline(&buf.split_to(stream_padding));
}
}
Ok(())
}
pub async fn split_async<ObjectID: FsVerityHashValue>(
mut tar_stream: impl AsyncRead + Unpin,
repo: Arc<Repository<ObjectID>>,
content_type: u64,
) -> Result<(ObjectID, ImportStats)> {
let semaphore = repo.write_semaphore();
let mut builder = SplitStreamBuilder::new(repo.clone(), content_type)?;
let mut parser = Parser::with_defaults();
let mut buf = BytesMut::with_capacity(IO_BUF_CAPACITY);
let mut need = HEADER_SIZE;
loop {
while buf.len() < need {
buf.reserve(need - buf.len());
let n = tar_stream.read_buf(&mut buf).await?;
if n == 0 {
if buf.is_empty() {
let (object_id, ss_stats) = builder.finish().await?;
return Ok((object_id, ImportStats::from_split_stream_stats(&ss_stats)));
}
bail!("unexpected EOF in tar stream");
}
}
match parser.parse(&buf)? {
ParseEvent::NeedData { min_bytes } => {
need = min_bytes;
continue;
}
ParseEvent::GlobalExtensions { consumed, .. } => {
builder.push_inline(&buf.split_to(consumed));
need = HEADER_SIZE;
continue;
}
ParseEvent::End { consumed } => {
builder.push_inline(&buf.split_to(consumed));
if !buf.is_empty() {
builder.push_inline(&buf.split());
}
loop {
buf.reserve(IO_BUF_CAPACITY);
let n = tar_stream.read_buf(&mut buf).await?;
if n == 0 {
break;
}
builder.push_inline(&buf.split());
}
break;
}
ParseEvent::SparseEntry { .. } => {
bail!("sparse tar entries are not supported");
}
ParseEvent::Entry { consumed, entry } => {
let actual_size = entry.size as usize;
let is_large_file =
entry.entry_type.is_file() && actual_size > INLINE_CONTENT_MAX_V0;
builder.push_inline(&buf.split_to(consumed));
let storage_size = actual_size.next_multiple_of(512);
if is_large_file {
let permit = semaphore.clone().acquire_owned().await?;
let (tx, rx) = mpsc::channel::<Bytes>(4);
let repo_clone = repo.clone();
let handle = tokio::task::spawn_blocking(move || {
let result =
receive_and_finalize_object(rx, actual_size as u64, &repo_clone);
drop(permit);
result
});
stream_large_file(
tx,
handle,
&mut builder,
&mut buf,
&mut tar_stream,
actual_size,
storage_size,
)
.await?;
} else {
if storage_size > 0 {
let from_buf = std::cmp::min(buf.len(), storage_size);
if from_buf > 0 {
builder.push_inline(&buf.split_to(from_buf));
}
let mut remaining = storage_size.checked_sub(from_buf).unwrap();
while remaining > 0 {
buf.reserve(std::cmp::min(remaining, IO_BUF_CAPACITY));
let n = tar_stream.read_buf(&mut buf).await?;
if n == 0 {
bail!("unexpected EOF reading tar entry");
}
let n = std::cmp::min(remaining, buf.len());
builder.push_inline(&buf.split_to(n));
remaining = remaining.checked_sub(n).unwrap();
}
}
}
need = HEADER_SIZE;
}
}
}
let (object_id, ss_stats) = builder.finish().await?;
Ok((object_id, ImportStats::from_split_stream_stats(&ss_stats)))
}
#[derive(Debug)]
pub enum TarItem<ObjectID: FsVerityHashValue> {
Directory,
Leaf(LeafContent<ObjectID>),
Hardlink(OsString),
}
#[derive(Debug)]
pub struct TarEntry<ObjectID: FsVerityHashValue> {
pub path: PathBuf,
pub stat: Stat,
pub item: TarItem<ObjectID>,
}
impl<ObjectID: FsVerityHashValue> fmt::Display for TarEntry<ObjectID> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self.item {
TarItem::Hardlink(ref target) => dumpfile::write_hardlink(fmt, &self.path, target),
TarItem::Directory => dumpfile::write_directory(fmt, &self.path, &self.stat, 1),
TarItem::Leaf(ref content) => {
dumpfile::write_leaf(fmt, &self.path, &self.stat, content, 1)
}
}
}
}
fn make_absolute_path(tar_path: &[u8]) -> PathBuf {
let tar_path = tar_path.strip_prefix(b"/").unwrap_or(tar_path);
let mut path = Vec::with_capacity(1 + tar_path.len());
path.push(b'/');
path.extend(tar_path);
while path.last() == Some(&b'/') && path.len() > 1 {
path.pop();
}
if path == b"/" {
path.clear();
}
PathBuf::from(OsString::from_vec(path))
}
pub fn get_entry<ObjectID: FsVerityHashValue>(
reader: &mut SplitStreamReader<ObjectID>,
) -> Result<Option<TarEntry<ObjectID>>> {
let mut parser = Parser::with_defaults();
let mut header_buf: Vec<u8> = Vec::new();
let mut block = [0u8; 512];
loop {
if !reader.read_inline_exact(&mut block)? {
return Ok(None);
}
header_buf.extend_from_slice(&block);
loop {
match parser.parse(&header_buf)? {
ParseEvent::NeedData { .. } => {
break;
}
ParseEvent::GlobalExtensions { consumed, .. } => {
header_buf.drain(..consumed);
continue;
}
ParseEvent::End { .. } => {
return Ok(None);
}
ParseEvent::Entry { entry, .. } => {
let size = entry.size;
let stored_size = size.next_multiple_of(512);
let item = match reader.read_exact(size as usize, stored_size as usize)? {
SplitStreamData::External(id) => match entry.entry_type {
EntryType::Regular | EntryType::Continuous => {
ensure!(
size as usize > INLINE_CONTENT_MAX_V0,
"Splitstream incorrectly stored a small ({size} byte) file external"
);
TarItem::Leaf(LeafContent::Regular(RegularFile::External(id, size)))
}
_ => bail!(
"Unsupported external-chunked entry {:?} {id:?}",
entry.entry_type
),
},
SplitStreamData::Inline(content) => match entry.entry_type {
EntryType::Directory => TarItem::Directory,
EntryType::Regular | EntryType::Continuous => {
ensure!(
content.len() <= INLINE_CONTENT_MAX_V0,
"Splitstream incorrectly stored a large ({} byte) file inline",
content.len()
);
TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(content)))
}
EntryType::Link => TarItem::Hardlink({
let link_target = entry.link_target.as_deref().unwrap_or_default();
make_absolute_path(link_target).into_os_string()
}),
EntryType::Symlink => TarItem::Leaf(LeafContent::Symlink({
let link_target = entry.link_target.as_deref().unwrap_or_default();
OsStr::from_bytes(link_target).into()
})),
EntryType::Block => TarItem::Leaf(LeafContent::BlockDevice(
match (entry.dev_major, entry.dev_minor) {
(Some(major), Some(minor)) => makedev(major, minor),
_ => bail!("Device entry without device numbers?"),
},
)),
EntryType::Char => TarItem::Leaf(LeafContent::CharacterDevice(match (
entry.dev_major,
entry.dev_minor,
) {
(Some(major), Some(minor)) => makedev(major, minor),
_ => bail!("Device entry without device numbers?"),
})),
EntryType::Fifo => TarItem::Leaf(LeafContent::Fifo),
_ => {
bail!("Unsupported entry type {:?}", entry.entry_type);
}
},
};
let xattrs: BTreeMap<_, _> = entry
.xattrs
.into_iter()
.map(|(k, v)| (Box::from(OsStr::from_bytes(&k)), Box::from(v.as_ref())))
.collect();
return Ok(Some(TarEntry {
path: make_absolute_path(&entry.path),
stat: Stat {
st_uid: entry.uid as u32,
st_gid: entry.gid as u32,
st_mode: entry.mode,
st_mtim_sec: entry.mtime as i64,
st_mtim_nsec: entry.pax.map_or(0, pax_mtime_nsec),
xattrs,
},
item,
}));
}
ParseEvent::SparseEntry { .. } => {
bail!("Sparse tar entries are not supported");
}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::TAR_LAYER_CONTENT_TYPE;
use super::*;
use composefs::{
fsverity::Sha256HashValue,
generic_tree::LeafContent,
repository::{Repository, RepositoryConfig},
splitstream::SplitStreamReader,
};
use std::{io::Read, path::Path, sync::Arc};
use tar::Builder;
use once_cell::sync::Lazy;
use std::sync::Mutex;
static TEST_TEMPDIRS: Lazy<Mutex<Vec<tempfile::TempDir>>> =
Lazy::new(|| Mutex::new(Vec::new()));
pub(crate) fn create_test_repository() -> Result<Arc<Repository<Sha256HashValue>>> {
let tempdir = tempfile::TempDir::new().unwrap();
let repo_path = tempdir.path().join("repo");
let (repo, _) = Repository::init_path(
rustix::fs::CWD,
&repo_path,
RepositoryConfig::default().set_insecure(),
)?;
{
let mut guard = TEST_TEMPDIRS.lock().unwrap();
guard.push(tempdir);
}
Ok(Arc::new(repo))
}
fn append_file(
builder: &mut Builder<&mut Vec<u8>>,
path: &str,
content: &[u8],
) -> Result<tar::Header> {
let mut header = tar::Header::new_gnu();
header.set_mode(0o644);
header.set_uid(1000);
header.set_gid(1000);
header.set_mtime(1234567890);
header.set_size(content.len() as u64);
header.set_entry_type(tar::EntryType::Regular);
builder.append_data(&mut header, path, content)?;
Ok(header)
}
async fn read_all_via_splitstream(tar_data: Vec<u8>) -> Result<Vec<TarEntry<Sha256HashValue>>> {
let repo = create_test_repository()?;
let (object_id, _stats) =
split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE).await?;
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id)?.into(),
Some(TAR_LAYER_CONTENT_TYPE),
)?;
let mut entries = Vec::new();
while let Some(entry) = get_entry(&mut reader)? {
entries.push(entry);
}
Ok(entries)
}
#[test]
fn test_pax_mtime_nsec_parsing() {
let pax = b"30 mtime=1234567890.123456789\n";
assert_eq!(pax_mtime_nsec(pax), 123_456_789, "9-digit fraction");
let pax = b"22 mtime=1234567890.5\n";
assert_eq!(pax_mtime_nsec(pax), 500_000_000, "1-digit fraction");
let pax = b"30 mtime=1234567890.000000001\n";
assert_eq!(pax_mtime_nsec(pax), 1, "trailing single non-zero digit");
let pax = b"31 mtime=1234567890.1234567899\n";
assert_eq!(
pax_mtime_nsec(pax),
123_456_789,
"10-digit fraction truncated"
);
let pax = b"20 mtime=1234567890\n";
assert_eq!(pax_mtime_nsec(pax), 0, "no fractional part");
let pax = b"16 path=foo.txt\n";
assert_eq!(pax_mtime_nsec(pax), 0, "no mtime key");
assert_eq!(pax_mtime_nsec(b""), 0, "empty pax");
}
#[test]
fn test_make_absolute_path() {
let cases: &[(&[u8], &str)] = &[
(b"foo/bar", "/foo/bar"),
(b"/foo/bar", "/foo/bar"),
(b"dir/", "/dir"),
(b"/dir/", "/dir"),
(b"a", "/a"),
(b"/a", "/a"),
(
b"usr/lib/python3/dist-packages/foo",
"/usr/lib/python3/dist-packages/foo",
),
(b"dir//", "/dir"),
(b"file.txt", "/file.txt"),
(b"a/b/c/", "/a/b/c"),
(b"", ""),
(b"/", ""),
];
for (input, expected) in cases {
assert_eq!(
make_absolute_path(input),
PathBuf::from(expected),
"make_absolute_path({:?})",
String::from_utf8_lossy(input),
);
}
}
#[tokio::test]
async fn test_empty_tar() {
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
builder.finish().unwrap();
}
let repo = create_test_repository().unwrap();
let (object_id, stats) = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
assert_eq!(
stats.objects_copied, 0,
"empty tar should have no external objects"
);
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
assert!(get_entry(&mut reader).unwrap().is_none());
}
#[test]
fn test_no_record_padding_roundtrip() {
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
append_file(&mut builder, "hello.txt", b"hello world").unwrap();
builder.finish().unwrap();
}
const GNU_RECORD_SIZE: usize = 20 * 512;
assert_ne!(
tar_data.len() % GNU_RECORD_SIZE,
0,
"expected tar without GNU record padding for this test"
);
roundtrip_tar_bytes(&tar_data);
}
#[test]
fn test_gnu_record_padding_roundtrip() {
const GNU_RECORD_SIZE: usize = 20 * 512;
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
append_file(&mut builder, "hello.txt", b"hello world").unwrap();
builder.finish().unwrap();
}
let remainder = tar_data.len() % GNU_RECORD_SIZE;
if remainder != 0 {
tar_data.resize(tar_data.len() + (GNU_RECORD_SIZE - remainder), 0);
}
assert_eq!(tar_data.len() % GNU_RECORD_SIZE, 0);
roundtrip_tar_bytes(&tar_data);
}
#[tokio::test]
async fn test_single_small_file() {
let mut tar_data = Vec::new();
let original_header = {
let mut builder = Builder::new(&mut tar_data);
let content = b"Hello, World!";
let header = append_file(&mut builder, "hello.txt", content).unwrap();
builder.finish().unwrap();
header
};
let repo = create_test_repository().unwrap();
let (object_id, stats) = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
assert_eq!(
stats.objects_copied, 0,
"small file should be inline, not external"
);
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
let entry = get_entry(&mut reader)
.unwrap()
.expect("Should have one entry");
assert_eq!(entry.path, PathBuf::from("/hello.txt"));
assert!(matches!(
entry.item,
TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(_)))
));
assert_header_stat_equal(&original_header, &entry.stat, "hello.txt");
if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) = entry.item {
assert_eq!(content.as_ref(), b"Hello, World!");
}
assert!(get_entry(&mut reader).unwrap().is_none());
}
#[tokio::test]
async fn test_inline_threshold() {
let mut tar_data = Vec::new();
let (threshold_header, over_threshold_header) = {
let mut builder = Builder::new(&mut tar_data);
let threshold_content = vec![b'X'; INLINE_CONTENT_MAX_V0];
let header1 =
append_file(&mut builder, "threshold_file.txt", &threshold_content).unwrap();
let over_threshold_content = vec![b'Y'; INLINE_CONTENT_MAX_V0 + 1];
let header2 = append_file(
&mut builder,
"over_threshold_file.txt",
&over_threshold_content,
)
.unwrap();
builder.finish().unwrap();
(header1, header2)
};
let repo = create_test_repository().unwrap();
let (object_id, stats) = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
assert_eq!(
stats.objects_copied, 1,
"one file over threshold should be external"
);
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
let mut object_refs = Vec::new();
reader
.get_object_refs(|id| object_refs.push(id.clone()))
.unwrap();
assert_eq!(
object_refs.len(),
1,
"should have exactly 1 external object ref"
);
let mut entries = Vec::new();
while let Some(entry) = get_entry(&mut reader).unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].path, PathBuf::from("/threshold_file.txt"));
assert_header_stat_equal(&threshold_header, &entries[0].stat, "threshold_file.txt");
if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
entries[0].item
{
assert_eq!(content.len(), INLINE_CONTENT_MAX_V0);
assert_eq!(content[0], b'X');
} else {
panic!("Expected inline regular file for threshold file");
}
assert_eq!(entries[1].path, PathBuf::from("/over_threshold_file.txt"));
assert_header_stat_equal(
&over_threshold_header,
&entries[1].stat,
"over_threshold_file.txt",
);
if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(_, size))) = entries[1].item
{
assert_eq!(size, (INLINE_CONTENT_MAX_V0 + 1) as u64);
} else {
panic!("Expected external regular file for over-threshold file");
}
}
#[tokio::test]
async fn test_round_trip_simple() {
let mut original_tar = Vec::new();
let (small_header, large_header) = {
let mut builder = Builder::new(&mut original_tar);
let small_content = b"Small file content";
let header1 = append_file(&mut builder, "small.txt", small_content).unwrap();
let large_content = vec![b'L'; INLINE_CONTENT_MAX_V0 + 100];
let header2 = append_file(&mut builder, "large.txt", &large_content).unwrap();
builder.finish().unwrap();
(header1, header2)
};
let repo = create_test_repository().unwrap();
let (object_id, stats) =
split_async(&original_tar[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
assert_eq!(
stats.objects_copied, 1,
"only the large file should be external"
);
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
let mut object_refs = Vec::new();
reader
.get_object_refs(|id| object_refs.push(id.clone()))
.unwrap();
assert_eq!(
object_refs.len(),
1,
"should have exactly 1 external object ref"
);
let mut entries = Vec::new();
while let Some(entry) = get_entry(&mut reader).unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2, "Should have exactly 2 entries");
assert_eq!(entries[0].path, PathBuf::from("/small.txt"));
assert_header_stat_equal(&small_header, &entries[0].stat, "small.txt");
if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
entries[0].item
{
assert_eq!(content.as_ref(), b"Small file content");
} else {
panic!("Expected inline regular file for small.txt");
}
assert_eq!(entries[1].path, PathBuf::from("/large.txt"));
assert_header_stat_equal(&large_header, &entries[1].stat, "large.txt");
if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(ref id, size))) =
entries[1].item
{
assert_eq!(size, (INLINE_CONTENT_MAX_V0 + 100) as u64);
use std::io::Read;
let mut external_data = Vec::new();
std::fs::File::from(repo.open_object(id).unwrap())
.read_to_end(&mut external_data)
.unwrap();
let expected_content = vec![b'L'; INLINE_CONTENT_MAX_V0 + 100];
assert_eq!(
external_data, expected_content,
"External file content should match"
);
} else {
panic!("Expected external regular file for large.txt");
}
}
#[tokio::test]
async fn test_special_filename_cases() {
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let content1 = b"Special chars content";
append_file(&mut builder, "file-with_special.chars@123", content1).unwrap();
let long_name = "a".repeat(100);
let content2 = b"Long filename content";
append_file(&mut builder, &long_name, content2).unwrap();
builder.finish().unwrap();
};
let entries = read_all_via_splitstream(tar_data).await.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(
entries[0].path,
PathBuf::from("/file-with_special.chars@123")
);
assert_eq!(
entries[0].path.file_name().unwrap(),
"file-with_special.chars@123"
);
let expected_long_path = format!("/{}", "a".repeat(100));
assert_eq!(entries[1].path, PathBuf::from(expected_long_path));
assert_eq!(entries[1].path.file_name().unwrap(), &*"a".repeat(100));
}
#[tokio::test]
async fn test_gnu_long_filename_reproduction() {
let very_long_path = format!(
"very/long/path/that/exceeds/the/normal/tar/header/limit/{}",
"x".repeat(120)
);
let content = b"Content for very long path";
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
append_file(&mut builder, &very_long_path, content).unwrap();
builder.finish().unwrap();
};
let entries = read_all_via_splitstream(tar_data).await.unwrap();
assert_eq!(entries.len(), 1);
let abspath = format!("/{very_long_path}");
assert_eq!(entries[0].path, Path::new(&abspath));
}
#[tokio::test]
async fn test_gnu_longlink() {
let very_long_path = format!(
"very/long/path/that/exceeds/the/normal/tar/header/limit/{}",
"x".repeat(120)
);
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let mut header = tar::Header::new_gnu();
header.set_mode(0o777);
header.set_entry_type(tar::EntryType::Symlink);
header.set_size(0);
header.set_uid(0);
header.set_gid(0);
builder
.append_link(&mut header, "long-symlink", &very_long_path)
.unwrap();
builder.finish().unwrap();
};
let entries = read_all_via_splitstream(tar_data).await.unwrap();
assert_eq!(entries.len(), 1);
match &entries[0].item {
TarItem::Leaf(LeafContent::Symlink(target)) => {
assert_eq!(&**target, OsStr::new(&very_long_path));
}
_ => unreachable!(),
};
}
fn assert_header_stat_equal(header: &tar::Header, stat: &Stat, msg_prefix: &str) {
assert_eq!(
header.mode().unwrap(),
stat.st_mode,
"{msg_prefix}: mode mismatch"
);
assert_eq!(
header.uid().unwrap() as u32,
stat.st_uid,
"{msg_prefix}: uid mismatch"
);
assert_eq!(
header.gid().unwrap() as u32,
stat.st_gid,
"{msg_prefix}: gid mismatch"
);
assert_eq!(
header.mtime().unwrap() as i64,
stat.st_mtim_sec,
"{msg_prefix}: mtime mismatch"
);
}
#[test]
#[ignore]
fn bench_tar_split() {
use std::time::Instant;
const NUM_FILES: usize = 10000;
const FILE_SIZE: usize = 200 * 1024; const ITERATIONS: usize = 3;
println!("\n=== Tar Split Benchmark ===");
println!(
"Configuration: {} files of {}KB each, {} iterations",
NUM_FILES,
FILE_SIZE / 1024,
ITERATIONS
);
fn generate_test_data(size: usize, seed: u8) -> Vec<u8> {
(0..size)
.map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
.collect()
}
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
for i in 0..NUM_FILES {
let content = generate_test_data(FILE_SIZE, i as u8);
let filename = format!("file_{:04}.bin", i);
append_file(&mut builder, &filename, &content).unwrap();
}
builder.finish().unwrap();
}
let tar_size = tar_data.len();
println!(
"Tar archive size: {} bytes ({:.2} MB)",
tar_size,
tar_size as f64 / (1024.0 * 1024.0)
);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let mut times = Vec::with_capacity(ITERATIONS);
for i in 0..ITERATIONS {
let repo = create_test_repository().unwrap();
let tar_data_clone = tar_data.clone();
let start = Instant::now();
rt.block_on(async {
split_async(&tar_data_clone[..], repo, TAR_LAYER_CONTENT_TYPE)
.await
.map(|(id, _stats)| id)
})
.unwrap();
let elapsed = start.elapsed();
times.push(elapsed);
println!("Iteration {}: {:?}", i + 1, elapsed);
}
let total: std::time::Duration = times.iter().sum();
let avg = total / ITERATIONS as u32;
println!("\n=== Summary ===");
println!(
"Average: {:?} ({:.2} MB/s)",
avg,
(tar_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64()
);
}
#[tokio::test]
async fn test_split_streaming_roundtrip() {
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let small_content = b"Small file content";
append_file(&mut builder, "small.txt", small_content).unwrap();
let large_content = vec![b'L'; INLINE_CONTENT_MAX_V0 + 100];
append_file(&mut builder, "large.txt", &large_content).unwrap();
let small2_content = b"Another small file";
append_file(&mut builder, "small2.txt", small2_content).unwrap();
builder.finish().unwrap();
}
let repo = create_test_repository().unwrap();
let (object_id, stats) = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
assert_eq!(
stats.objects_copied, 1,
"only the large file should be external"
);
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
let mut object_refs = Vec::new();
reader
.get_object_refs(|id| object_refs.push(id.clone()))
.unwrap();
assert_eq!(
object_refs.len(),
1,
"should have exactly 1 external object ref"
);
let mut entries = Vec::new();
while let Some(entry) = get_entry(&mut reader).unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3, "Should have 3 entries");
assert_eq!(entries[0].path, PathBuf::from("/small.txt"));
if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
entries[0].item
{
assert_eq!(content.as_ref(), b"Small file content");
} else {
panic!("Expected inline regular file for small.txt");
}
assert_eq!(entries[1].path, PathBuf::from("/large.txt"));
if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(ref id, size))) =
entries[1].item
{
assert_eq!(size, (INLINE_CONTENT_MAX_V0 + 100) as u64);
let mut external_data = Vec::new();
std::fs::File::from(repo.open_object(id).unwrap())
.read_to_end(&mut external_data)
.unwrap();
let expected_content = vec![b'L'; INLINE_CONTENT_MAX_V0 + 100];
assert_eq!(
external_data, expected_content,
"External file content should match"
);
} else {
panic!("Expected external regular file for large.txt");
}
assert_eq!(entries[2].path, PathBuf::from("/small2.txt"));
if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
entries[2].item
{
assert_eq!(content.as_ref(), b"Another small file");
} else {
panic!("Expected inline regular file for small2.txt");
}
}
#[tokio::test]
async fn test_split_streaming_multiple_large_files() {
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
for i in 0..3 {
let content = vec![(i + 0x41) as u8; INLINE_CONTENT_MAX_V0 + 1000]; let filename = format!("file{}.bin", i);
append_file(&mut builder, &filename, &content).unwrap();
}
builder.finish().unwrap();
}
let repo = create_test_repository().unwrap();
let (object_id, stats) = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
assert_eq!(
stats.objects_copied, 3,
"all 3 large files should be external"
);
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
let mut object_refs = Vec::new();
reader
.get_object_refs(|id| object_refs.push(id.clone()))
.unwrap();
assert_eq!(
object_refs.len(),
3,
"should have exactly 3 external object refs"
);
let mut entries = Vec::new();
while let Some(entry) = get_entry(&mut reader).unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3, "Should have 3 entries");
for (i, entry) in entries.iter().enumerate() {
let expected_path = format!("/file{}.bin", i);
assert_eq!(entry.path, PathBuf::from(&expected_path));
if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(ref id, size))) =
entry.item
{
assert_eq!(size, (INLINE_CONTENT_MAX_V0 + 1000) as u64);
let mut external_data = Vec::new();
std::fs::File::from(repo.open_object(id).unwrap())
.read_to_end(&mut external_data)
.unwrap();
let expected_content = vec![(i + 0x41) as u8; INLINE_CONTENT_MAX_V0 + 1000];
assert_eq!(
external_data, expected_content,
"External file {} content should match",
i
);
} else {
panic!("Expected external regular file for file{}.bin", i);
}
}
}
#[tokio::test]
async fn test_longpath_formats() {
let cases: &[(&str, fn() -> String, bool)] = &[
("short path", || "short.txt".to_string(), false),
("exactly 100 chars", || "x".repeat(100), false),
(
"ustar prefix",
|| format!("{}/{}", "dir".repeat(40), "file.txt"),
false,
),
(
"max ustar (~254 chars)",
|| format!("{}/{}", "p".repeat(154), "n".repeat(99)),
false,
),
(
"gnu longname",
|| format!("{}/{}", "a".repeat(80), "b".repeat(50)),
true,
),
(
"pax extended",
|| format!("{}/{}", "sub/".repeat(60), "file.txt"),
false,
),
];
for (desc, make_path, use_gnu) in cases {
let path = make_path();
let content = b"test content";
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let mut header = if *use_gnu {
tar::Header::new_gnu()
} else {
tar::Header::new_ustar()
};
header.set_mode(0o644);
header.set_uid(1000);
header.set_gid(1000);
header.set_mtime(1234567890);
header.set_size(content.len() as u64);
header.set_entry_type(tar::EntryType::Regular);
builder
.append_data(&mut header, &path, &content[..])
.unwrap();
builder.finish().unwrap();
}
let entries = read_all_via_splitstream(tar_data).await.unwrap();
assert_eq!(entries.len(), 1, "{desc}: expected 1 entry");
assert_eq!(
entries[0].path,
PathBuf::from(format!("/{}", path)),
"{desc}: path mismatch (len={})",
path.len()
);
}
}
#[tokio::test]
async fn test_longpath_hardlinks() {
let cases: &[(&str, fn() -> String, bool)] = &[
("short target", || "target.txt".to_string(), true),
(
"gnu longlink",
|| format!("{}/{}", "c".repeat(80), "d".repeat(50)),
true,
),
(
"pax linkpath",
|| format!("{}/{}", "sub/".repeat(60), "target.txt"),
false,
),
];
for (desc, make_target, use_gnu) in cases {
let target_path = make_target();
let link_name = "hardlink";
let content = b"target content";
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let mut header = if *use_gnu {
tar::Header::new_gnu()
} else {
tar::Header::new_ustar()
};
header.set_mode(0o644);
header.set_uid(1000);
header.set_gid(1000);
header.set_mtime(1234567890);
header.set_size(content.len() as u64);
header.set_entry_type(tar::EntryType::Regular);
builder
.append_data(&mut header, &target_path, &content[..])
.unwrap();
let mut link_header = if *use_gnu {
tar::Header::new_gnu()
} else {
tar::Header::new_ustar()
};
link_header.set_mode(0o644);
link_header.set_uid(1000);
link_header.set_gid(1000);
link_header.set_mtime(1234567890);
link_header.set_size(0);
link_header.set_entry_type(tar::EntryType::Link);
builder
.append_link(&mut link_header, link_name, &target_path)
.unwrap();
builder.finish().unwrap();
}
let entries = read_all_via_splitstream(tar_data).await.unwrap();
assert_eq!(entries.len(), 2, "{desc}: expected 2 entries");
assert_eq!(
entries[0].path,
PathBuf::from(format!("/{}", target_path)),
"{desc}"
);
assert_eq!(
entries[1].path,
PathBuf::from(format!("/{}", link_name)),
"{desc}"
);
match &entries[1].item {
TarItem::Hardlink(target) => {
assert_eq!(
target.to_str().unwrap(),
format!("/{}", target_path),
"{desc}: hardlink target mismatch"
);
}
_ => panic!("{desc}: expected hardlink entry"),
}
}
}
#[tokio::test]
async fn test_ustar_prefix_field_used() {
let dir_path =
"usr/lib/python3.12/site-packages/some-very-long-package-name-here/__pycache__/subdir";
let filename = "module_name_with_extra_stuff.cpython-312.opt-2.pyc";
let full_path = format!("{dir_path}/{filename}");
assert!(
full_path.len() > 100,
"full path must exceed 100 chars to use prefix"
);
assert!(filename.len() <= 100, "filename must fit in name field");
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let mut header = tar::Header::new_ustar();
header.set_mode(0o644);
header.set_size(4);
header.set_entry_type(tar::EntryType::Regular);
header.set_path(&full_path).unwrap();
header.set_cksum();
builder.append(&header, b"test".as_slice()).unwrap();
builder.finish().unwrap();
}
let prefix_field = &tar_data[345..500];
let prefix_str = std::str::from_utf8(prefix_field)
.unwrap()
.trim_end_matches('\0');
assert_eq!(
prefix_str, dir_path,
"UStar prefix field should contain directory"
);
let entries = read_all_via_splitstream(tar_data).await.unwrap();
assert_eq!(entries[0].path, PathBuf::from(format!("/{full_path}")));
}
fn roundtrip_tar_bytes(tar_data: &[u8]) {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let repo = create_test_repository().unwrap();
let (object_id, _stats) = split_async(tar_data, repo.clone(), TAR_LAYER_CONTENT_TYPE)
.await
.unwrap();
let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
repo.open_object(&object_id).unwrap().into(),
Some(TAR_LAYER_CONTENT_TYPE),
)
.unwrap();
let mut reassembled = Vec::new();
reader.cat(&repo, &mut reassembled).unwrap();
assert_eq!(
reassembled.len(),
tar_data.len(),
"reassembled tar length mismatch"
);
assert_eq!(
reassembled, tar_data,
"reassembled tar bytes differ from original"
);
});
}
mod proptest_tests {
use super::*;
use proptest::prelude::*;
fn path_component() -> impl Strategy<Value = String> {
proptest::string::string_regex("[a-zA-Z0-9_][a-zA-Z0-9_.-]{0,30}")
.expect("valid regex")
.prop_filter("non-empty", |s| !s.is_empty())
}
fn path_with_length(min_len: usize, max_len: usize) -> impl Strategy<Value = String> {
prop::collection::vec(path_component(), 1..20)
.prop_map(|components| components.join("/"))
.prop_filter("length in range", move |p| {
p.len() >= min_len && p.len() <= max_len
})
}
fn roundtrip_path(path: &str) {
let content = b"proptest content";
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let mut header = tar::Header::new_ustar();
header.set_mode(0o644);
header.set_uid(1000);
header.set_gid(1000);
header.set_mtime(1234567890);
header.set_size(content.len() as u64);
header.set_entry_type(tar::EntryType::Regular);
builder
.append_data(&mut header, path, &content[..])
.unwrap();
builder.finish().unwrap();
}
let rt = tokio::runtime::Runtime::new().unwrap();
let entries = rt.block_on(read_all_via_splitstream(tar_data)).unwrap();
assert_eq!(entries.len(), 1, "expected 1 entry for path: {path}");
assert_eq!(
entries[0].path,
PathBuf::from(format!("/{path}")),
"path mismatch"
);
}
fn roundtrip_hardlink(target_path: &str) {
let link_name = "link";
let content = b"target content";
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let mut header = tar::Header::new_ustar();
header.set_mode(0o644);
header.set_uid(1000);
header.set_gid(1000);
header.set_mtime(1234567890);
header.set_size(content.len() as u64);
header.set_entry_type(tar::EntryType::Regular);
builder
.append_data(&mut header, target_path, &content[..])
.unwrap();
let mut link_header = tar::Header::new_ustar();
link_header.set_mode(0o644);
link_header.set_uid(1000);
link_header.set_gid(1000);
link_header.set_mtime(1234567890);
link_header.set_size(0);
link_header.set_entry_type(tar::EntryType::Link);
builder
.append_link(&mut link_header, link_name, target_path)
.unwrap();
builder.finish().unwrap();
}
let rt = tokio::runtime::Runtime::new().unwrap();
let entries = rt.block_on(read_all_via_splitstream(tar_data)).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].path, PathBuf::from(format!("/{target_path}")));
match &entries[1].item {
TarItem::Hardlink(target) => {
assert_eq!(target.to_str().unwrap(), format!("/{target_path}"));
}
_ => panic!("expected hardlink"),
}
}
fn file_size_strategy() -> impl Strategy<Value = usize> {
prop_oneof![
3 => 0..=INLINE_CONTENT_MAX_V0, 2 => (INLINE_CONTENT_MAX_V0 + 1)..=(INLINE_CONTENT_MAX_V0 + 2048), 1 => (INLINE_CONTENT_MAX_V0 + 2048)..=100_000usize, 2 => prop::sample::select(vec![
0, 1, 63, 64, 65, 511, 512, 513, 1023, 1024, 1025, ]),
]
}
fn tar_entry_strategy() -> impl Strategy<Value = (String, Vec<u8>)> {
(file_size_strategy(), any::<u8>()).prop_flat_map(|(size, fill)| {
(0..10000u32).prop_map(move |id| {
let name = format!("file_{:05}.bin", id);
let content = vec![fill.wrapping_add(id as u8); size];
(name, content)
})
})
}
fn build_tar(entries: &[(String, Vec<u8>)]) -> Vec<u8> {
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
for (name, content) in entries {
append_file(&mut builder, name, content).unwrap();
}
builder.finish().unwrap();
}
tar_data
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(64))]
#[test]
fn test_short_paths(path in path_with_length(1, 100)) {
roundtrip_path(&path);
}
#[test]
fn test_medium_paths(path in path_with_length(101, 255)) {
roundtrip_path(&path);
}
#[test]
fn test_long_paths(path in path_with_length(256, 500)) {
roundtrip_path(&path);
}
#[test]
fn test_hardlink_targets(target in path_with_length(1, 400)) {
roundtrip_hardlink(&target);
}
#[test]
fn test_tar_byte_roundtrip_proptest(
entries in prop::collection::vec(tar_entry_strategy(), 1..8)
) {
let tar_data = build_tar(&entries);
roundtrip_tar_bytes(&tar_data);
}
}
}
}