sparseio 0.0.1

A library for coordinating sparse, out-of-order byte-range fetching and materialization of large objects.
Documentation
//! Example Sparse File-to-File Implementation
mod common;

use std::fs;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;

use clap::Parser;
use common::sparse_materialization::{
    SparseMaterializationConfig, SparseMaterializationOptions, run_sparse_materialization,
};
use sparseio::Builder;
use sparseio::sources::file::{Reader, Writer};
use sparseio::utils::materialization;

/// Simple sparse file-to-file implementation. This example demonstrates how to use
/// the sparse I/O library to implement a simple file-to-file copy operation that
/// supports sparse files. It reads from a source file and writes to a destination
/// file, while tracking coverage and in-flight operations.
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
    /// Input file path
    #[arg(short, long, default_value = "target/manual/file-to-file-src.bin")]
    src: String,

    /// Output file path
    #[arg(short, long, default_value = "target/manual/file-to-file-dst.bin")]
    dst: String,

    /// Generate a deterministic source file before materializing.
    #[arg(long, default_value_t = true)]
    generate_source: bool,

    /// Length of the autogenerated source file in bytes.
    #[arg(long, default_value_t = 8 * 1024 * 1024)]
    source_len: usize,

    /// Pre-size destination logical length before random chunk materialization.
    #[arg(long, default_value_t = false)]
    pre_size_dst: bool,

    #[command(flatten)]
    sparse: SparseMaterializationOptions,
}

#[tokio::main]
/// Executes the file-to-file sparse materialization example workflow.
async fn main() -> std::io::Result<()> {
    let args = Args::parse();
    args.sparse.validate()?;

    let src_path = Path::new(&args.src).to_path_buf();
    let dst_path = Path::new(&args.dst).to_path_buf();
    if args.generate_source {
        generate_source_file(&src_path, args.source_len)?;
        println!("generated source file: {} ({} bytes)", src_path.display(), args.source_len);
    }
    if dst_path.exists() {
        std::fs::remove_file(&dst_path)?;
    }

    let sparse_io = Arc::new(
        Builder::new()
            .chunk_size(args.sparse.chunk_size)
            .reader(Reader::new(src_path.clone()))
            .writer(Writer::new(dst_path.clone()))
            .build()
            .await?,
    );
    let len = sparse_io.len();
    if len == 0 {
        return Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "source file is empty"));
    }

    if args.pre_size_dst {
        let file = std::fs::OpenOptions::new()
            .create(true)
            .write(true)
            .truncate(true)
            .open(&dst_path)?;
        file.set_len(len as u64)?;
        println!("pre-sized destination file: {} ({} bytes logical)", dst_path.display(), len);
    }

    let chunk_size = args.sparse.chunk_size;
    let result = run_sparse_materialization(
        SparseMaterializationConfig {
            len,
            options: &args.sparse,
        },
        |offset| {
            let sparse_io = sparse_io.clone();
            async move {
                let mut viewer = sparse_io.viewer();
                let normalized_offset = offset - (offset % chunk_size);
                viewer.seek(normalized_offset)?;
                let mut buffer = vec![0u8; chunk_size];
                viewer.read(&mut buffer).await
            }
        },
        {
            let dst_path = dst_path.clone();
            let mut previous_allocated = 0u64;
            move |step| {
                let logical_size = fs::metadata(&dst_path)?.len();
                let actual_size = allocated_bytes(&dst_path)?;

                println!("logical file size: {} bytes", logical_size);
                println!(
                    "step progress: {:>6.2}% (requested offset {})",
                    step.progress_percent, step.requested_offset
                );
                match actual_size {
                    Some(actual_size) => {
                        let delta = actual_size.saturating_sub(previous_allocated);
                        println!("actual allocated size: {} bytes (+{} bytes)", actual_size, delta);
                        previous_allocated = actual_size;
                    }
                    None => println!("actual allocated size: unavailable on this platform"),
                }

                if step.index == 0 && step.total_steps > 1 {
                    let first_write_end = (step.normalized_offset + step.chunk_len) as u64;
                    assert!(
                        logical_size >= first_write_end,
                        "after first write, logical size should include the written extent end"
                    );
                    assert!(
                        logical_size <= step.len as u64,
                        "logical size cannot exceed the configured object length"
                    );
                    assert!(
                        step.materialized_bytes < step.len,
                        "after the first write, the destination should still be logically sparse"
                    );
                    println!(
                        "sparse checkpoint confirmed: logical size is {} bytes while only {} bytes have been materialized",
                        logical_size, step.materialized_bytes
                    );

                    let hole_offset = step
                        .logical_offsets
                        .iter()
                        .copied()
                        .find(|offset| *offset < logical_size as usize && !step.filled_offsets.contains(offset));
                    if let Some(zero_check_offset) = hole_offset {
                        let zero_check_len = (step.len - zero_check_offset).min(step.chunk_size);
                        assert_unwritten_region_is_zeroed(&dst_path, zero_check_offset, zero_check_len)?;
                        println!(
                            "hole check passed: unwritten region [{}, {}) still reads as zeroes",
                            zero_check_offset,
                            zero_check_offset + zero_check_len
                        );
                    } else {
                        println!("hole check skipped: no unwritten region currently addressable");
                    }

                    if let Some(actual_size) = actual_size {
                        if actual_size < logical_size {
                            println!(
                                "filesystem view also looks sparse: actual={} logical={}",
                                actual_size, logical_size
                            );
                        } else {
                            eprintln!(
                                "filesystem view did not expose sparseness here: actual={} logical={}",
                                actual_size, logical_size
                            );
                        }
                    }
                }

                Ok(())
            }
        },
    )
    .await?;

    if result.filled_offsets.len() == result.logical_offsets.len() {
        materialization::verify_full_materialization(&src_path, &dst_path)?;
        println!("final verification passed: destination matches source");
    } else {
        materialization::verify_partial_materialization(
            &src_path,
            &dst_path,
            &result.filled_offsets,
            args.sparse.chunk_size,
            len,
        )?;
        println!("partial verification passed: filled chunks match source, unfilled chunks remain zeroed");
    }

    Ok(())
}

/// Generates deterministic source content of exactly `len` bytes at `path`.
fn generate_source_file(path: &Path, len: usize) -> std::io::Result<()> {
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }

    let mut file = fs::File::create(path)?;
    let mut remaining = len;
    let chunk_len = 1024 * 1024;
    let mut chunk = vec![0u8; chunk_len];

    for (index, byte) in chunk.iter_mut().enumerate() {
        *byte = (index % 251) as u8;
    }

    while remaining > 0 {
        let to_write = remaining.min(chunk.len());
        file.write_all(&chunk[..to_write])?;
        remaining -= to_write;
    }

    file.flush()?;
    Ok(())
}

#[cfg(target_os = "linux")]
/// Returns physically allocated bytes for `path` on Linux filesystems.
fn allocated_bytes(path: &Path) -> std::io::Result<Option<u64>> {
    use std::os::unix::fs::MetadataExt;

    Ok(Some(fs::metadata(path)?.blocks() * 512))
}

#[cfg(target_os = "macos")]
/// Returns physically allocated bytes for `path` on macOS via `du -k`.
fn allocated_bytes(path: &Path) -> std::io::Result<Option<u64>> {
    use std::process::Command;

    let output = Command::new("du").arg("-k").arg(path).output()?;
    if !output.status.success() {
        return Err(std::io::Error::other(format!(
            "du -k failed for {} with status {:?}",
            path.display(),
            output.status.code()
        )));
    }

    let stdout = String::from_utf8_lossy(&output.stdout);
    let kib = stdout
        .split_whitespace()
        .next()
        .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "du output missing size column"))?
        .parse::<u64>()
        .map_err(std::io::Error::other)?;

    Ok(Some(kib * 1024))
}

#[cfg(not(any(target_os = "linux", target_os = "macos")))]
/// Returns `None` where allocated-size inspection is not implemented.
fn allocated_bytes(_path: &Path) -> std::io::Result<Option<u64>> {
    Ok(None)
}

/// Asserts the specified destination range reads back as all zero bytes.
fn assert_unwritten_region_is_zeroed(path: &Path, offset: usize, len: usize) -> std::io::Result<()> {
    let mut file = fs::File::open(path)?;
    file.seek(SeekFrom::Start(offset as u64))?;

    let mut buffer = vec![0u8; len];
    file.read_exact(&mut buffer)?;
    assert!(
        buffer.iter().all(|byte| *byte == 0),
        "expected unwritten region [{}, {}) to read back as zeroes",
        offset,
        offset + len
    );

    Ok(())
}