trazaeo 0.5.6

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use super::util::{
    from_json, into_py_result_display, parse_hash32_hex, to_json, validate_input_path,
};
use crate::python_facade as facade;
use pyo3::prelude::*;
use pyo3::types::PyModule;
use pyo3::wrap_pyfunction;

fn descriptor_for_path(
    path: &str,
    chunk_size: usize,
    num_threads: Option<usize>,
    artifact_id: Option<&str>,
    media_type: Option<&str>,
    created_at: Option<&str>,
) -> PyResult<crate::content::ContentDescriptor> {
    let canonical = validate_input_path(path)?;
    let threads = thread_count(num_threads);
    into_py_result_display(facade::content_descriptor_for_path(
        &canonical,
        chunk_size,
        threads,
        artifact_id.unwrap_or_else(|| {
            canonical
                .file_name()
                .and_then(|v| v.to_str())
                .unwrap_or("artifact")
        }),
        media_type.unwrap_or("application/octet-stream"),
        created_at.unwrap_or("1970-01-01T00:00:00Z"),
    ))
}

fn thread_count(num_threads: Option<usize>) -> usize {
    num_threads.unwrap_or(1)
}

fn read_canonical_file_bytes(
    canonical: &std::path::Path,
    operation: &'static str,
) -> PyResult<Vec<u8>> {
    std::fs::read(canonical)
        .map_err(|e| super::util::py_err(facade::io_err(operation, format!("{e}"))))
}

fn read_validated_file_bytes(path: &str, operation: &'static str) -> PyResult<Vec<u8>> {
    let canonical = validate_input_path(path)?;
    read_canonical_file_bytes(&canonical, operation)
}

fn hash_file_root_bytes(path: &str, chunk_size: usize, threads: usize) -> PyResult<Vec<u8>> {
    let canonical = validate_input_path(path)?;
    let root = facade::hash_file_streaming_root(&canonical, chunk_size, threads)
        .map_err(|err| super::util::py_err(facade::io_err("hash file root", format!("{err}"))))?;
    Ok(root.0.to_vec())
}

type DeliveryVerificationInputs = (
    facade::DeliveryProofPackage,
    Option<Vec<u8>>,
    Option<crate::utils::Hash>,
);

fn delivery_verification_inputs(
    delivery_proof_package_json: &str,
    artifact_path: Option<&str>,
    trusted_checkpoint_log_root_hex: Option<&str>,
    artifact_read_operation: &'static str,
) -> PyResult<DeliveryVerificationInputs> {
    let package: facade::DeliveryProofPackage =
        from_json("parse delivery proof package", delivery_proof_package_json)?;
    let artifact_bytes = artifact_path
        .map(|path| read_validated_file_bytes(path, artifact_read_operation))
        .transpose()?;
    let trusted_checkpoint_log_root = trusted_checkpoint_log_root_hex
        .map(parse_hash32_hex)
        .transpose()?;
    Ok((package, artifact_bytes, trusted_checkpoint_log_root))
}

#[pyfunction]
pub(crate) fn blake3_hash(data: &[u8]) -> Vec<u8> {
    facade::blake3_hash_with_threads(data, 1).0.to_vec()
}

#[pyfunction]
pub(crate) fn blake3_hash_mt(data: &[u8], num_threads: usize) -> Vec<u8> {
    facade::blake3_hash_with_threads(data, num_threads)
        .0
        .to_vec()
}

#[pyfunction]
pub(crate) fn blake3_content_root(
    path: &str,
    chunk_size: usize,
    num_threads: Option<usize>,
) -> PyResult<Vec<u8>> {
    let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
    Ok(parse_hash32_hex(&descriptor.content_root_hash)?.0.to_vec())
}

#[pyfunction]
pub(crate) fn content_descriptor_json(
    path: &str,
    chunk_size: usize,
    num_threads: Option<usize>,
    artifact_id: Option<&str>,
    media_type: Option<&str>,
    created_at: Option<&str>,
) -> PyResult<String> {
    let descriptor = descriptor_for_path(
        path,
        chunk_size,
        num_threads,
        artifact_id,
        media_type,
        created_at,
    )?;
    to_json("serialize content descriptor", &descriptor)
}

#[pyfunction]
#[pyo3(signature = (
    path,
    chunk_size,
    num_threads = None,
    artifact_id = None,
    media_type = None,
    created_at = None
))]
pub(crate) fn full_root_proof_package_json(
    path: &str,
    chunk_size: usize,
    num_threads: Option<usize>,
    artifact_id: Option<&str>,
    media_type: Option<&str>,
    created_at: Option<&str>,
) -> PyResult<String> {
    let descriptor = descriptor_for_path(
        path,
        chunk_size,
        num_threads,
        artifact_id,
        media_type,
        created_at,
    )?;
    let package = into_py_result_display(facade::build_full_root_proof_package(&descriptor))?;
    to_json("serialize full-root proof package", &package)
}

#[cfg(feature = "bao-range-proofs")]
#[pyfunction]
pub(crate) fn bao_outboard_json(
    path: &str,
    chunk_size: usize,
    num_threads: Option<usize>,
    outboard_ref: Option<&str>,
) -> PyResult<String> {
    let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
    let bytes = read_validated_file_bytes(path, "read file for bao outboard")?;
    let outboard = facade::generate_bao_outboard(&bytes);
    let descriptor = facade::attach_bao_outboard(&descriptor, outboard_ref, &outboard);
    let payload = serde_json::json!({
        "content_descriptor": descriptor,
        "outboard_bytes": outboard.outboard_bytes,
        "outboard_hash": outboard.outboard_hash,
    });
    to_json("serialize bao outboard payload", &payload)
}

#[cfg(feature = "bao-range-proofs")]
#[pyfunction]
pub(crate) fn bao_range_proof_package_json(
    path: &str,
    start: u64,
    len: u64,
    chunk_size: usize,
    num_threads: Option<usize>,
) -> PyResult<String> {
    let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
    let bytes = read_validated_file_bytes(path, "read file for bao proof package")?;
    let outboard = facade::generate_bao_outboard(&bytes);
    let descriptor =
        facade::attach_bao_outboard(&descriptor, Some("inline://bao-outboard"), &outboard);
    let package = into_py_result_display(facade::build_bao_range_proof_package(
        &descriptor,
        &bytes,
        &outboard.outboard_bytes,
        start,
        len,
    ))?;
    to_json("serialize bao range proof package", &package)
}

#[cfg(feature = "bao-range-proofs")]
#[pyfunction]
pub(crate) fn verify_bao_range_proof_package_json(
    range_proof_package_json: &str,
) -> PyResult<Vec<u8>> {
    let package: facade::RangeProofPackage =
        from_json("parse bao range proof package", range_proof_package_json)?;
    into_py_result_display(facade::verify_bao_range_proof_package(&package))
}

#[pyfunction]
pub(crate) fn verify_full_root_proof_package_json(
    range_proof_package_json: &str,
    path: &str,
) -> PyResult<Vec<u8>> {
    let package: facade::RangeProofPackage =
        from_json("parse full-root proof package", range_proof_package_json)?;
    let bytes = read_validated_file_bytes(path, "read file for full-root proof verification")?;
    into_py_result_display(facade::verify_full_root_proof_package(&package, &bytes))
}

#[pyfunction]
pub(crate) fn build_checkpoint_manifest_json(
    checkpoint_id: &str,
    checkpoint_time_window: &str,
    published_artifacts_json: &str,
    lineage_refs: Vec<String>,
    checkpoint_signature_bundle: Vec<String>,
    prior_checkpoint_ref: Option<&str>,
) -> PyResult<String> {
    let published_artifacts: Vec<facade::CheckpointArtifact> = from_json(
        "parse published artifacts for checkpoint manifest",
        published_artifacts_json,
    )?;
    let manifest = facade::build_checkpoint_manifest(
        checkpoint_id,
        checkpoint_time_window,
        prior_checkpoint_ref,
        checkpoint_signature_bundle,
        published_artifacts,
        lineage_refs,
    );
    to_json("serialize checkpoint manifest", &manifest)
}

#[pyfunction]
pub(crate) fn build_delivery_proof_package_json(
    range_proof_package_json: &str,
    checkpoint_manifest_json: &str,
    lineage_envelopes: Vec<String>,
    signature_bundle: Vec<String>,
) -> PyResult<String> {
    let range_proof_package: facade::RangeProofPackage = from_json(
        "parse range proof package for delivery proof",
        range_proof_package_json,
    )?;
    let checkpoint_manifest: facade::CheckpointManifest = from_json(
        "parse checkpoint manifest for delivery proof",
        checkpoint_manifest_json,
    )?;
    let package = into_py_result_display(facade::build_delivery_proof_package(
        &range_proof_package,
        lineage_envelopes,
        checkpoint_manifest,
        signature_bundle,
    ))?;
    to_json("serialize delivery proof package", &package)
}

#[pyfunction]
#[pyo3(signature = (delivery_proof_package_json, artifact_path=None, trusted_checkpoint_log_root_hex=None))]
pub(crate) fn verify_delivery_proof_package_json(
    delivery_proof_package_json: &str,
    artifact_path: Option<&str>,
    trusted_checkpoint_log_root_hex: Option<&str>,
) -> PyResult<Vec<u8>> {
    let (package, artifact_bytes, trusted_checkpoint_log_root) = delivery_verification_inputs(
        delivery_proof_package_json,
        artifact_path,
        trusted_checkpoint_log_root_hex,
        "read artifact for delivery proof verification",
    )?;
    into_py_result_display(facade::verify_delivery_proof_package_against_root(
        &package,
        artifact_bytes.as_deref(),
        trusted_checkpoint_log_root.as_ref(),
    ))
}

#[pyfunction]
#[pyo3(signature = (delivery_proof_package_json, artifact_path=None, trusted_checkpoint_log_root_hex=None))]
pub(crate) fn verify_delivery_proof_package_report_json(
    delivery_proof_package_json: &str,
    artifact_path: Option<&str>,
    trusted_checkpoint_log_root_hex: Option<&str>,
) -> PyResult<String> {
    let (package, artifact_bytes, trusted_checkpoint_log_root) = delivery_verification_inputs(
        delivery_proof_package_json,
        artifact_path,
        trusted_checkpoint_log_root_hex,
        "read artifact for delivery proof report verification",
    )?;
    let result = facade::verify_delivery_proof_package_with_report(
        &package,
        artifact_bytes.as_deref(),
        trusted_checkpoint_log_root.as_ref(),
    );
    to_json(
        "serialize delivery proof verification report",
        &result.report,
    )
}

#[pyfunction]
pub(crate) fn blake3_hash_file_root(
    path: &str,
    chunk_size: usize,
    num_threads: Option<usize>,
) -> PyResult<Vec<u8>> {
    hash_file_root_bytes(path, chunk_size, thread_count(num_threads))
}

#[pyfunction]
pub(crate) fn batch_blake3_hash(chunks: Vec<Vec<u8>>, num_threads: Option<usize>) -> Vec<Vec<u8>> {
    let threads = thread_count(num_threads);
    chunks
        .into_iter()
        .map(|chunk| facade::blake3_hash_with_threads(&chunk, threads).0.to_vec())
        .collect()
}

#[pyfunction]
pub(crate) fn batch_blake3_hash_file_roots(
    paths: Vec<String>,
    chunk_size: usize,
    num_threads: Option<usize>,
) -> PyResult<Vec<Vec<u8>>> {
    let threads = thread_count(num_threads);
    paths
        .into_iter()
        .map(|path| hash_file_root_bytes(&path, chunk_size, threads))
        .collect()
}

pub(crate) fn register_hashing_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(blake3_hash, m)?)?;
    m.add_function(wrap_pyfunction!(blake3_hash_mt, m)?)?;
    m.add_function(wrap_pyfunction!(blake3_content_root, m)?)?;
    m.add_function(wrap_pyfunction!(content_descriptor_json, m)?)?;
    m.add_function(wrap_pyfunction!(full_root_proof_package_json, m)?)?;
    #[cfg(feature = "bao-range-proofs")]
    m.add_function(wrap_pyfunction!(bao_outboard_json, m)?)?;
    #[cfg(feature = "bao-range-proofs")]
    m.add_function(wrap_pyfunction!(bao_range_proof_package_json, m)?)?;
    #[cfg(feature = "bao-range-proofs")]
    m.add_function(wrap_pyfunction!(verify_bao_range_proof_package_json, m)?)?;
    m.add_function(wrap_pyfunction!(verify_full_root_proof_package_json, m)?)?;
    m.add_function(wrap_pyfunction!(build_checkpoint_manifest_json, m)?)?;
    m.add_function(wrap_pyfunction!(build_delivery_proof_package_json, m)?)?;
    m.add_function(wrap_pyfunction!(verify_delivery_proof_package_json, m)?)?;
    m.add_function(wrap_pyfunction!(
        verify_delivery_proof_package_report_json,
        m
    )?)?;
    m.add_function(wrap_pyfunction!(blake3_hash_file_root, m)?)?;
    m.add_function(wrap_pyfunction!(batch_blake3_hash, m)?)?;
    m.add_function(wrap_pyfunction!(batch_blake3_hash_file_roots, m)?)?;
    Ok(())
}