pixelflow-filters 0.1.0

Official in-repository filters for PixelFlow.
use std::fs::File;
use std::io::{Read, Write};
use std::path::{Component, Path, PathBuf};
use std::time::UNIX_EPOCH;

use pixelflow_core::{ErrorCategory, ErrorCode, PixelFlowError, Result};
use semisafe::slice::get as semisafe_get;
use semisafe::slice::get_mut as semisafe_get_mut;
use tempfile::NamedTempFile;

pub(crate) const CACHE_FORMAT_VERSION: u32 = 1;
pub(crate) const HASH_WINDOW_BYTES: usize = 16 * 1024 * 1024;
const CACHE_MAGIC: &[u8; 12] = b"PXFLFFMS2IDX";

#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct SourceFingerprint {
    size: u64,
    modified_unix_nanos: i128,
    first_16_mib_hash: [u8; 32],
}

impl SourceFingerprint {
    pub(crate) fn from_file(path: &Path) -> Result<Self> {
        let metadata = std::fs::metadata(path).map_err(|error| {
            fingerprint_error(format!(
                "failed to stat source '{}': {error}",
                path.display()
            ))
        })?;
        let modified = metadata.modified().map_err(|error| {
            fingerprint_error(format!(
                "failed to read modified time for source '{}': {error}",
                path.display()
            ))
        })?;
        let modified_unix_nanos = i128::try_from(
            modified
                .duration_since(UNIX_EPOCH)
                .map_err(|error| {
                    fingerprint_error(format!(
                        "source '{}' modified time predates unix epoch: {error}",
                        path.display()
                    ))
                })?
                .as_nanos(),
        )
        .map_err(|_| {
            fingerprint_error(format!(
                "source '{}' modified time does not fit cache fingerprint",
                path.display()
            ))
        })?;

        let mut file = File::open(path).map_err(|error| {
            fingerprint_error(format!(
                "failed to open source '{}': {error}",
                path.display()
            ))
        })?;
        let mut hasher = blake3::Hasher::new();
        let mut buffer = [0u8; 8192];
        let mut remaining = HASH_WINDOW_BYTES;
        while remaining > 0 {
            let read_len = buffer.len().min(remaining);
            let bytes_read = file
                .read(semisafe_get_mut(&mut buffer, ..read_len))
                .map_err(|error| {
                    fingerprint_error(format!(
                        "failed to read source '{}': {error}",
                        path.display()
                    ))
                })?;
            if bytes_read == 0 {
                break;
            }
            hasher.update(semisafe_get(&buffer, ..bytes_read));
            remaining -= bytes_read;
        }

        Ok(Self {
            size: metadata.len(),
            modified_unix_nanos,
            first_16_mib_hash: *hasher.finalize().as_bytes(),
        })
    }

    #[cfg(test)]
    pub(crate) const fn first_16_mib_hash(&self) -> [u8; 32] {
        self.first_16_mib_hash
    }
}

pub(crate) fn cache_file_name(source_path: &str) -> String {
    let joined = Path::new(source_path)
        .components()
        .filter_map(|component| match component {
            Component::Normal(value) => Some(value.to_string_lossy().into_owned()),
            _ => None,
        })
        .collect::<Vec<_>>()
        .join("_");
    let sanitized = sanitize_cache_stem(if joined.is_empty() {
        source_path
    } else {
        &joined
    });
    let digest = blake3::hash(source_path.as_bytes()).to_hex();

    let short_digest = digest.chars().take(16).collect::<String>();
    format!("{sanitized}-{short_digest}.ffms2.pfidx")
}

pub(crate) fn default_cache_path(script_dir: &Path, source_path: &str) -> PathBuf {
    script_dir.join(cache_file_name(source_path))
}

pub(crate) fn read_cache(path: &Path, fingerprint: &SourceFingerprint) -> Result<Option<Vec<u8>>> {
    let bytes = match std::fs::read(path) {
        Ok(bytes) => bytes,
        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
        Err(error) => {
            return Err(cache_read_error(format!(
                "failed to read cache '{}': {error}",
                path.display()
            )));
        }
    };

    if bytes.len() < CACHE_MAGIC.len() + 4 + 8 + 16 + 32 + 8 {
        return Err(cache_read_error(format!(
            "cache '{}' is truncated",
            path.display()
        )));
    }

    let mut cursor = 0usize;
    if semisafe_get(&bytes, cursor..cursor + CACHE_MAGIC.len()) != CACHE_MAGIC {
        return Err(cache_read_error(format!(
            "cache '{}' has invalid header",
            path.display()
        )));
    }
    cursor += CACHE_MAGIC.len();

    let version = read_u32(&bytes, &mut cursor, path)?;
    let size = read_u64(&bytes, &mut cursor, path)?;
    let modified_unix_nanos = read_i128(&bytes, &mut cursor, path)?;
    let hash = read_hash(&bytes, &mut cursor, path)?;
    let index_len = usize::try_from(read_u64(&bytes, &mut cursor, path)?).map_err(|_| {
        cache_read_error(format!(
            "cache '{}' index length does not fit platform usize",
            path.display()
        ))
    })?;

    if version != CACHE_FORMAT_VERSION
        || size != fingerprint.size
        || modified_unix_nanos != fingerprint.modified_unix_nanos
        || hash != fingerprint.first_16_mib_hash
    {
        return Ok(None);
    }

    if bytes.len() != cursor + index_len {
        return Err(cache_read_error(format!(
            "cache '{}' payload length does not match header",
            path.display()
        )));
    }

    Ok(Some(semisafe_get(&bytes, cursor..).to_vec()))
}

pub(crate) fn write_cache(
    path: &Path,
    fingerprint: &SourceFingerprint,
    ffms_index_bytes: &[u8],
) -> Result<()> {
    let parent = cache_parent(path)?;
    let mut payload =
        Vec::with_capacity(CACHE_MAGIC.len() + 4 + 8 + 16 + 32 + 8 + ffms_index_bytes.len());
    payload.extend_from_slice(CACHE_MAGIC);
    payload.extend_from_slice(&CACHE_FORMAT_VERSION.to_le_bytes());
    payload.extend_from_slice(&fingerprint.size.to_le_bytes());
    payload.extend_from_slice(&fingerprint.modified_unix_nanos.to_le_bytes());
    payload.extend_from_slice(&fingerprint.first_16_mib_hash);
    payload.extend_from_slice(
        &u64::try_from(ffms_index_bytes.len())
            .map_err(|_| cache_write_error("FFMS2 index payload exceeds supported size"))?
            .to_le_bytes(),
    );
    payload.extend_from_slice(ffms_index_bytes);

    let mut temp = NamedTempFile::new_in(parent).map_err(|error| {
        cache_write_error(format!(
            "failed to create temporary cache file in '{}': {error}",
            parent.display()
        ))
    })?;
    temp.write_all(&payload).map_err(|error| {
        cache_write_error(format!(
            "failed to write temporary cache file for '{}': {error}",
            path.display()
        ))
    })?;
    temp.flush().map_err(|error| {
        cache_write_error(format!(
            "failed to flush temporary cache file for '{}': {error}",
            path.display()
        ))
    })?;
    temp.persist(path).map_err(|error| {
        cache_write_error(format!(
            "failed to persist cache '{}': {}",
            path.display(),
            error.error
        ))
    })?;

    Ok(())
}

fn sanitize_cache_stem(source_path: &str) -> String {
    let mut sanitized = String::with_capacity(source_path.len());
    let mut last_was_underscore = false;

    for ch in source_path.chars() {
        let valid = ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-');
        let ch = if valid { ch } else { '_' };
        if ch == '_' {
            if last_was_underscore {
                continue;
            }
            last_was_underscore = true;
        } else {
            last_was_underscore = false;
        }
        sanitized.push(ch);
    }

    let trimmed = sanitized.trim_matches('_');
    if trimmed.is_empty() {
        "source".to_owned()
    } else {
        trimmed.to_owned()
    }
}

fn cache_parent(path: &Path) -> Result<&Path> {
    match path.parent() {
        Some(parent) if !parent.as_os_str().is_empty() => Ok(parent),
        Some(_) | None => Ok(Path::new(".")),
    }
}

fn read_u32(bytes: &[u8], cursor: &mut usize, path: &Path) -> Result<u32> {
    let slice = take_exact(bytes, cursor, 4, path)?;
    let mut array = [0u8; 4];
    array.copy_from_slice(slice);
    Ok(u32::from_le_bytes(array))
}

fn read_u64(bytes: &[u8], cursor: &mut usize, path: &Path) -> Result<u64> {
    let slice = take_exact(bytes, cursor, 8, path)?;
    let mut array = [0u8; 8];
    array.copy_from_slice(slice);
    Ok(u64::from_le_bytes(array))
}

fn read_i128(bytes: &[u8], cursor: &mut usize, path: &Path) -> Result<i128> {
    let slice = take_exact(bytes, cursor, 16, path)?;
    let mut array = [0u8; 16];
    array.copy_from_slice(slice);
    Ok(i128::from_le_bytes(array))
}

fn read_hash(bytes: &[u8], cursor: &mut usize, path: &Path) -> Result<[u8; 32]> {
    let slice = take_exact(bytes, cursor, 32, path)?;
    let mut array = [0u8; 32];
    array.copy_from_slice(slice);
    Ok(array)
}

fn take_exact<'a>(
    bytes: &'a [u8],
    cursor: &mut usize,
    len: usize,
    path: &Path,
) -> Result<&'a [u8]> {
    let end = cursor.checked_add(len).ok_or_else(|| {
        cache_read_error(format!(
            "cache '{}' cursor overflowed while parsing",
            path.display()
        ))
    })?;
    let slice = bytes.get(*cursor..end).ok_or_else(|| {
        cache_read_error(format!(
            "cache '{}' ended unexpectedly while parsing",
            path.display()
        ))
    })?;
    *cursor = end;
    Ok(slice)
}

fn fingerprint_error(message: impl Into<String>) -> PixelFlowError {
    PixelFlowError::new(
        ErrorCategory::Source,
        ErrorCode::new("source.fingerprint"),
        message,
    )
}

fn cache_read_error(message: impl Into<String>) -> PixelFlowError {
    PixelFlowError::new(
        ErrorCategory::Source,
        ErrorCode::new("source.cache_read"),
        message,
    )
}

fn cache_write_error(message: impl Into<String>) -> PixelFlowError {
    PixelFlowError::new(
        ErrorCategory::Source,
        ErrorCode::new("source.cache_write"),
        message,
    )
}

#[cfg(test)]
mod tests {
    #![expect(clippy::indexing_slicing, reason = "allow in tests")]

    use tempfile::tempdir;

    use pixelflow_core::{ErrorCategory, ErrorCode};

    use super::{SourceFingerprint, cache_file_name, read_cache, write_cache};

    #[test]
    fn sanitized_cache_filename_is_stable_and_collision_resistant() {
        let name = cache_file_name("../Media/Input Clip #1.mkv");

        assert!(name.starts_with("Media_Input_Clip_1.mkv-"));
        assert!(name.ends_with(".ffms2.pfidx"));
        assert!(
            name.chars()
                .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
        );
    }

    #[test]
    fn fingerprint_hashes_only_first_16_mib() {
        let temp = tempdir().expect("tempdir");
        let path = temp.path().join("source.bin");
        let mut data = vec![1u8; 16 * 1024 * 1024 + 1];
        std::fs::write(&path, &data).expect("write source");
        let first = SourceFingerprint::from_file(&path).expect("fingerprint");

        data[16 * 1024 * 1024] = 2;
        std::fs::write(&path, &data).expect("rewrite source");
        let second = SourceFingerprint::from_file(&path).expect("fingerprint");

        assert_eq!(first.first_16_mib_hash(), second.first_16_mib_hash());
    }

    #[test]
    fn cache_payload_roundtrips_and_rejects_mismatched_fingerprint() {
        let temp = tempdir().expect("tempdir");
        let source = temp.path().join("input.bin");
        std::fs::write(&source, b"abcdef").expect("source");
        let fingerprint = SourceFingerprint::from_file(&source).expect("fingerprint");
        let cache = temp.path().join("cache.ffms2.pfidx");

        write_cache(&cache, &fingerprint, b"raw-index").expect("cache write");
        assert_eq!(
            read_cache(&cache, &fingerprint).expect("cache read"),
            Some(b"raw-index".to_vec())
        );

        std::fs::write(&source, b"abcdeg").expect("source changes hash");
        let changed = SourceFingerprint::from_file(&source).expect("fingerprint");
        assert_eq!(read_cache(&cache, &changed).expect("cache read"), None);
    }

    #[test]
    fn cache_write_failure_is_structured_source_error() {
        let temp = tempdir().expect("tempdir");
        let source = temp.path().join("input.bin");
        std::fs::write(&source, b"abcdef").expect("source");
        let fingerprint = SourceFingerprint::from_file(&source).expect("fingerprint");

        let error = write_cache(temp.path(), &fingerprint, b"raw-index")
            .expect_err("directory path cannot be overwritten as cache file");

        assert_eq!(error.category(), ErrorCategory::Source);
        assert_eq!(error.code(), ErrorCode::new("source.cache_write"));
    }
}