takanawa-core 0.2.0

Core chunk planning and resumable .part file state for Takanawa range downloads
Documentation
use std::ffi::OsString;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};

use fs2::FileExt;
use sha2::{Digest, Sha256};

use crate::chunk::{ChunkPlan, normalize_chunk_size};
use crate::metadata::{PartMetadata, RemoteInfo, slot_size_for};
use crate::{HashConfig, Result, TakanawaError, hash_url};

#[derive(Debug)]
pub struct PartFile {
    file: File,
    lock_file: File,
    lock_path: PathBuf,
    part_path: PathBuf,
    slot_size: u64,
    active_slot: u8,
    metadata: PartMetadata,
}

impl PartFile {
    pub fn open_or_create(
        target_path: &Path,
        url: &str,
        remote: &RemoteInfo,
        chunk_size: u64,
        hash: HashConfig,
    ) -> Result<Self> {
        if target_path.exists() {
            return Err(TakanawaError::TargetExists(target_path.to_owned()));
        }

        let chunk_size = normalize_chunk_size(chunk_size)?;
        let part_path = part_path_for(target_path);
        let lock_path = part_lock_path_for(target_path);
        let lock_file = acquire_lock(&lock_path)?;
        let slot_size = slot_size_for(remote.content_len, chunk_size)?;
        let expected_len = remote
            .content_len
            .checked_add(slot_size.checked_mul(2).ok_or_else(|| {
                TakanawaError::InvalidConfig("part file length overflow".to_owned())
            })?)
            .ok_or_else(|| TakanawaError::InvalidConfig("part file length overflow".to_owned()))?;
        let url_hash = hash_url(url);

        if part_path.exists() {
            let mut file = OpenOptions::new().read(true).write(true).open(&part_path)?;
            let actual_len = file.metadata()?.len();
            if actual_len != expected_len {
                return Err(TakanawaError::PartSizeMismatch {
                    expected: expected_len,
                    actual: actual_len,
                });
            }

            let (metadata, active_slot) =
                read_best_metadata(&mut file, remote.content_len, slot_size)?;
            metadata.ensure_compatible(url_hash, remote, chunk_size, hash)?;
            return Ok(Self {
                file,
                lock_file,
                lock_path,
                part_path,
                slot_size,
                active_slot,
                metadata,
            });
        }

        let mut file = OpenOptions::new()
            .read(true)
            .write(true)
            .create_new(true)
            .open(&part_path)?;
        file.set_len(expected_len)?;

        let metadata = PartMetadata::new(url_hash, remote, chunk_size, hash)?;
        let slot = metadata.encode_slot(slot_size)?;
        file.seek(SeekFrom::Start(remote.content_len))?;
        file.write_all(&slot)?;
        file.sync_all()?;

        Ok(Self {
            file,
            lock_file,
            lock_path,
            part_path,
            slot_size,
            active_slot: 0,
            metadata,
        })
    }

    #[must_use]
    pub const fn metadata(&self) -> &PartMetadata {
        &self.metadata
    }

    #[must_use]
    pub fn incomplete_chunks(&self) -> Vec<u64> {
        self.metadata.bitmap.incomplete_indices()
    }

    pub fn write_chunk(&mut self, index: u64, bytes: &[u8]) -> Result<()> {
        let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
        let chunk = plan.chunk(index)?;
        if bytes.len() != usize::try_from(chunk.len).unwrap_or(usize::MAX) {
            return Err(TakanawaError::HttpProtocol(format!(
                "chunk {index} length mismatch: expected {}, got {}",
                chunk.len,
                bytes.len()
            )));
        }
        if self.metadata.bitmap.is_complete(index)? {
            return Ok(());
        }

        self.file.seek(SeekFrom::Start(chunk.start))?;
        self.file.write_all(bytes)?;
        self.file.sync_data()?;

        self.metadata.bitmap.mark_complete(index)?;
        self.commit_metadata()
    }

    pub fn finalize(mut self, target_path: &Path) -> Result<()> {
        if target_path.exists() {
            return Err(TakanawaError::TargetExists(target_path.to_owned()));
        }
        if !self.metadata.all_complete() {
            return Err(TakanawaError::InvalidConfig(
                "cannot finalize an incomplete part file".to_owned(),
            ));
        }

        if let HashConfig::Sha256(expected) = self.metadata.hash {
            let actual = self.compute_sha256()?;
            if actual != expected {
                return Err(TakanawaError::HashMismatch);
            }
        }

        let PartFile {
            file,
            lock_file,
            lock_path,
            part_path,
            metadata,
            ..
        } = self;
        file.set_len(metadata.content_len)?;
        file.sync_all()?;
        drop(file);
        fs::rename(&part_path, target_path)?;
        sync_parent_dir(target_path);
        drop(lock_file);
        let _ = fs::remove_file(lock_path);
        Ok(())
    }

    fn commit_metadata(&mut self) -> Result<()> {
        self.metadata.generation = self.metadata.generation.checked_add(1).ok_or_else(|| {
            TakanawaError::InvalidConfig("metadata generation overflow".to_owned())
        })?;
        self.active_slot = (self.metadata.generation % 2) as u8;
        let slot = self.metadata.encode_slot(self.slot_size)?;
        let offset = self.metadata.content_len + u64::from(self.active_slot) * self.slot_size;
        self.file.seek(SeekFrom::Start(offset))?;
        self.file.write_all(&slot)?;
        self.file.sync_all()?;
        Ok(())
    }

    fn compute_sha256(&mut self) -> Result<[u8; 32]> {
        let mut hasher = Sha256::new();
        let mut remaining = self.metadata.content_len;
        let mut buffer = vec![0; 1024 * 1024];
        self.file.seek(SeekFrom::Start(0))?;
        while remaining > 0 {
            let read_len = usize::try_from(remaining.min(buffer.len() as u64))
                .expect("bounded by buffer length");
            self.file.read_exact(&mut buffer[..read_len])?;
            hasher.update(&buffer[..read_len]);
            remaining -= read_len as u64;
        }
        Ok(hasher.finalize().into())
    }
}

pub fn part_path_for(target_path: &Path) -> PathBuf {
    let mut value: OsString = target_path.as_os_str().to_owned();
    value.push(".part");
    PathBuf::from(value)
}

pub fn part_lock_path_for(target_path: &Path) -> PathBuf {
    let mut value: OsString = target_path.as_os_str().to_owned();
    value.push(".part.lock");
    PathBuf::from(value)
}

fn acquire_lock(lock_path: &Path) -> Result<File> {
    let lock_file = OpenOptions::new()
        .read(true)
        .write(true)
        .create(true)
        .truncate(false)
        .open(lock_path)?;
    match lock_file.try_lock_exclusive() {
        Ok(()) => Ok(lock_file),
        Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
            Err(TakanawaError::PartBusy(lock_path.to_owned()))
        }
        Err(err) => Err(TakanawaError::Io(err)),
    }
}

fn read_best_metadata(
    file: &mut File,
    content_len: u64,
    slot_size: u64,
) -> Result<(PartMetadata, u8)> {
    let slot_len = usize::try_from(slot_size)
        .map_err(|_| TakanawaError::PartCorrupt("slot size overflow".to_owned()))?;
    let mut slots = Vec::new();

    for slot_index in 0..2_u8 {
        let offset = content_len + u64::from(slot_index) * slot_size;
        let mut buffer = vec![0; slot_len];
        file.seek(SeekFrom::Start(offset))?;
        file.read_exact(&mut buffer)?;
        if let Ok(metadata) = PartMetadata::decode_slot(&buffer) {
            slots.push((metadata, slot_index));
        }
    }

    slots
        .into_iter()
        .max_by_key(|(metadata, _)| metadata.generation)
        .ok_or_else(|| TakanawaError::PartCorrupt("no valid metadata slot found".to_owned()))
}

#[cfg(unix)]
fn sync_parent_dir(target_path: &Path) {
    if let Some(parent) = target_path.parent() {
        if let Ok(dir) = File::open(parent) {
            let _ = dir.sync_all();
        }
    }
}

#[cfg(not(unix))]
fn sync_parent_dir(_target_path: &Path) {}

#[cfg(test)]
mod tests {
    use std::fs;

    use tempfile::TempDir;

    use super::*;

    fn remote(content_len: u64) -> RemoteInfo {
        RemoteInfo {
            content_len,
            etag: Some("etag".to_owned()),
            last_modified: Some("now".to_owned()),
        }
    }

    #[test]
    fn resumes_valid_part() {
        let dir = TempDir::new().unwrap();
        let target = dir.path().join("file.bin");
        {
            let mut part = PartFile::open_or_create(
                &target,
                "https://example.test/file",
                &remote(6),
                3,
                HashConfig::None,
            )
            .unwrap();
            part.write_chunk(0, b"abc").unwrap();
        }

        let part = PartFile::open_or_create(
            &target,
            "https://example.test/file",
            &remote(6),
            3,
            HashConfig::None,
        )
        .unwrap();

        assert_eq!(part.metadata().completed_chunks(), 1);
        assert_eq!(part.incomplete_chunks(), vec![1]);
    }

    #[test]
    fn rejects_part_size_mismatch() {
        let dir = TempDir::new().unwrap();
        let target = dir.path().join("file.bin");
        let part_path = part_path_for(&target);
        fs::write(&part_path, b"too short").unwrap();

        let err = PartFile::open_or_create(
            &target,
            "https://example.test/file",
            &remote(6),
            3,
            HashConfig::None,
        )
        .unwrap_err();

        assert!(matches!(err, TakanawaError::PartSizeMismatch { .. }));
    }

    #[test]
    fn finalizes_and_strips_metadata() {
        let dir = TempDir::new().unwrap();
        let target = dir.path().join("file.bin");
        let mut part = PartFile::open_or_create(
            &target,
            "https://example.test/file",
            &remote(6),
            3,
            HashConfig::None,
        )
        .unwrap();
        part.write_chunk(1, b"def").unwrap();
        part.write_chunk(0, b"abc").unwrap();
        part.finalize(&target).unwrap();

        assert_eq!(fs::read(&target).unwrap(), b"abcdef");
        assert!(!part_path_for(&target).exists());
    }
}