use super::util::{
from_json, into_py_result_display, parse_hash32_hex, to_json, validate_input_path,
};
use crate::python_facade as facade;
use pyo3::prelude::*;
use pyo3::types::PyModule;
use pyo3::wrap_pyfunction;
fn descriptor_for_path(
path: &str,
chunk_size: usize,
num_threads: Option<usize>,
artifact_id: Option<&str>,
media_type: Option<&str>,
created_at: Option<&str>,
) -> PyResult<crate::content::ContentDescriptor> {
let canonical = validate_input_path(path)?;
let threads = num_threads.unwrap_or(1);
into_py_result_display(facade::content_descriptor_for_path(
&canonical,
chunk_size,
threads,
artifact_id.unwrap_or_else(|| {
canonical
.file_name()
.and_then(|v| v.to_str())
.unwrap_or("artifact")
}),
media_type.unwrap_or("application/octet-stream"),
created_at.unwrap_or("1970-01-01T00:00:00Z"),
))
}
fn read_canonical_file_bytes(
canonical: &std::path::Path,
operation: &'static str,
) -> PyResult<Vec<u8>> {
std::fs::read(canonical)
.map_err(|e| super::util::py_err(facade::io_err(operation, format!("{e}"))))
}
#[pyfunction]
pub(crate) fn blake3_hash(data: &[u8]) -> Vec<u8> {
facade::blake3_hash_with_threads(data, 1).0.to_vec()
}
#[pyfunction]
pub(crate) fn blake3_hash_mt(data: &[u8], num_threads: usize) -> Vec<u8> {
facade::blake3_hash_with_threads(data, num_threads)
.0
.to_vec()
}
#[pyfunction]
pub(crate) fn blake3_content_root(
path: &str,
chunk_size: usize,
num_threads: Option<usize>,
) -> PyResult<Vec<u8>> {
let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
Ok(parse_hash32_hex(&descriptor.content_root_hash)?.0.to_vec())
}
#[pyfunction]
pub(crate) fn content_descriptor_json(
path: &str,
chunk_size: usize,
num_threads: Option<usize>,
artifact_id: Option<&str>,
media_type: Option<&str>,
created_at: Option<&str>,
) -> PyResult<String> {
let descriptor = descriptor_for_path(
path,
chunk_size,
num_threads,
artifact_id,
media_type,
created_at,
)?;
to_json("serialize content descriptor", &descriptor)
}
#[pyfunction]
pub(crate) fn full_root_proof_package_json(
path: &str,
chunk_size: usize,
num_threads: Option<usize>,
) -> PyResult<String> {
let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
let package = into_py_result_display(facade::build_full_root_proof_package(&descriptor))?;
to_json("serialize full-root proof package", &package)
}
#[cfg(feature = "bao-range-proofs")]
#[pyfunction]
pub(crate) fn bao_outboard_json(
path: &str,
chunk_size: usize,
num_threads: Option<usize>,
outboard_ref: Option<&str>,
) -> PyResult<String> {
let canonical = validate_input_path(path)?;
let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
let bytes = read_canonical_file_bytes(&canonical, "read file for bao outboard")?;
let outboard = facade::generate_bao_outboard(&bytes);
let descriptor = facade::attach_bao_outboard(&descriptor, outboard_ref, &outboard);
let payload = serde_json::json!({
"content_descriptor": descriptor,
"outboard_bytes": outboard.outboard_bytes,
"outboard_hash": outboard.outboard_hash,
});
to_json("serialize bao outboard payload", &payload)
}
#[cfg(feature = "bao-range-proofs")]
#[pyfunction]
pub(crate) fn bao_range_proof_package_json(
path: &str,
start: u64,
len: u64,
chunk_size: usize,
num_threads: Option<usize>,
) -> PyResult<String> {
let canonical = validate_input_path(path)?;
let descriptor = descriptor_for_path(path, chunk_size, num_threads, None, None, None)?;
let bytes = read_canonical_file_bytes(&canonical, "read file for bao proof package")?;
let outboard = facade::generate_bao_outboard(&bytes);
let descriptor =
facade::attach_bao_outboard(&descriptor, Some("inline://bao-outboard"), &outboard);
let package = into_py_result_display(facade::build_bao_range_proof_package(
&descriptor,
&bytes,
&outboard.outboard_bytes,
start,
len,
))?;
to_json("serialize bao range proof package", &package)
}
#[cfg(feature = "bao-range-proofs")]
#[pyfunction]
pub(crate) fn verify_bao_range_proof_package_json(
range_proof_package_json: &str,
) -> PyResult<Vec<u8>> {
let package: facade::RangeProofPackage =
from_json("parse bao range proof package", range_proof_package_json)?;
into_py_result_display(facade::verify_bao_range_proof_package(&package))
}
#[pyfunction]
pub(crate) fn verify_full_root_proof_package_json(
range_proof_package_json: &str,
path: &str,
) -> PyResult<Vec<u8>> {
let canonical = validate_input_path(path)?;
let package: facade::RangeProofPackage =
from_json("parse full-root proof package", range_proof_package_json)?;
let bytes =
read_canonical_file_bytes(&canonical, "read file for full-root proof verification")?;
into_py_result_display(facade::verify_full_root_proof_package(&package, &bytes))
}
#[pyfunction]
pub(crate) fn build_checkpoint_manifest_json(
checkpoint_id: &str,
checkpoint_time_window: &str,
published_artifacts_json: &str,
lineage_refs: Vec<String>,
checkpoint_signature_bundle: Vec<String>,
prior_checkpoint_ref: Option<&str>,
) -> PyResult<String> {
let published_artifacts: Vec<facade::CheckpointArtifact> = from_json(
"parse published artifacts for checkpoint manifest",
published_artifacts_json,
)?;
let manifest = facade::build_checkpoint_manifest(
checkpoint_id,
checkpoint_time_window,
prior_checkpoint_ref,
checkpoint_signature_bundle,
published_artifacts,
lineage_refs,
);
to_json("serialize checkpoint manifest", &manifest)
}
#[pyfunction]
pub(crate) fn build_delivery_proof_package_json(
range_proof_package_json: &str,
checkpoint_manifest_json: &str,
lineage_envelopes: Vec<String>,
signature_bundle: Vec<String>,
) -> PyResult<String> {
let range_proof_package: facade::RangeProofPackage = from_json(
"parse range proof package for delivery proof",
range_proof_package_json,
)?;
let checkpoint_manifest: facade::CheckpointManifest = from_json(
"parse checkpoint manifest for delivery proof",
checkpoint_manifest_json,
)?;
let package = into_py_result_display(facade::build_delivery_proof_package(
&range_proof_package,
lineage_envelopes,
checkpoint_manifest,
signature_bundle,
))?;
to_json("serialize delivery proof package", &package)
}
#[pyfunction]
#[pyo3(signature = (delivery_proof_package_json, artifact_path=None, trusted_checkpoint_log_root_hex=None))]
pub(crate) fn verify_delivery_proof_package_json(
delivery_proof_package_json: &str,
artifact_path: Option<&str>,
trusted_checkpoint_log_root_hex: Option<&str>,
) -> PyResult<Vec<u8>> {
let package: facade::DeliveryProofPackage =
from_json("parse delivery proof package", delivery_proof_package_json)?;
let artifact_bytes = artifact_path
.map(|path| {
let canonical = validate_input_path(path)?;
read_canonical_file_bytes(&canonical, "read artifact for delivery proof verification")
})
.transpose()?;
let trusted_checkpoint_log_root = trusted_checkpoint_log_root_hex
.map(parse_hash32_hex)
.transpose()?;
into_py_result_display(facade::verify_delivery_proof_package_against_root(
&package,
artifact_bytes.as_deref(),
trusted_checkpoint_log_root.as_ref(),
))
}
#[pyfunction]
#[pyo3(signature = (delivery_proof_package_json, artifact_path=None, trusted_checkpoint_log_root_hex=None))]
pub(crate) fn verify_delivery_proof_package_report_json(
delivery_proof_package_json: &str,
artifact_path: Option<&str>,
trusted_checkpoint_log_root_hex: Option<&str>,
) -> PyResult<String> {
let package: facade::DeliveryProofPackage =
from_json("parse delivery proof package", delivery_proof_package_json)?;
let artifact_bytes = artifact_path
.map(|path| {
let canonical = validate_input_path(path)?;
read_canonical_file_bytes(
&canonical,
"read artifact for delivery proof report verification",
)
})
.transpose()?;
let trusted_checkpoint_log_root = trusted_checkpoint_log_root_hex
.map(parse_hash32_hex)
.transpose()?;
let result = facade::verify_delivery_proof_package_with_report(
&package,
artifact_bytes.as_deref(),
trusted_checkpoint_log_root.as_ref(),
);
to_json(
"serialize delivery proof verification report",
&result.report,
)
}
#[pyfunction]
pub(crate) fn blake3_hash_file_root(
path: &str,
chunk_size: usize,
num_threads: Option<usize>,
) -> PyResult<Vec<u8>> {
let canonical = validate_input_path(path)?;
let threads = num_threads.unwrap_or(1);
let root = facade::hash_file_streaming_root(&canonical, chunk_size, threads)
.map_err(|err| super::util::py_err(facade::io_err("hash file root", format!("{err}"))))?;
Ok(root.0.to_vec())
}
#[pyfunction]
pub(crate) fn batch_blake3_hash(chunks: Vec<Vec<u8>>, num_threads: Option<usize>) -> Vec<Vec<u8>> {
let threads = num_threads.unwrap_or(1);
chunks
.into_iter()
.map(|chunk| facade::blake3_hash_with_threads(&chunk, threads).0.to_vec())
.collect()
}
#[pyfunction]
pub(crate) fn batch_blake3_hash_file_roots(
paths: Vec<String>,
chunk_size: usize,
num_threads: Option<usize>,
) -> PyResult<Vec<Vec<u8>>> {
let threads = num_threads.unwrap_or(1);
paths
.into_iter()
.map(|path| {
let canonical = validate_input_path(&path)?;
let root = facade::hash_file_streaming_root(&canonical, chunk_size, threads).map_err(
|err| super::util::py_err(facade::io_err("hash file root", format!("{err}"))),
)?;
Ok(root.0.to_vec())
})
.collect()
}
pub(crate) fn register_hashing_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(blake3_hash, m)?)?;
m.add_function(wrap_pyfunction!(blake3_hash_mt, m)?)?;
m.add_function(wrap_pyfunction!(blake3_content_root, m)?)?;
m.add_function(wrap_pyfunction!(content_descriptor_json, m)?)?;
m.add_function(wrap_pyfunction!(full_root_proof_package_json, m)?)?;
#[cfg(feature = "bao-range-proofs")]
m.add_function(wrap_pyfunction!(bao_outboard_json, m)?)?;
#[cfg(feature = "bao-range-proofs")]
m.add_function(wrap_pyfunction!(bao_range_proof_package_json, m)?)?;
#[cfg(feature = "bao-range-proofs")]
m.add_function(wrap_pyfunction!(verify_bao_range_proof_package_json, m)?)?;
m.add_function(wrap_pyfunction!(verify_full_root_proof_package_json, m)?)?;
m.add_function(wrap_pyfunction!(build_checkpoint_manifest_json, m)?)?;
m.add_function(wrap_pyfunction!(build_delivery_proof_package_json, m)?)?;
m.add_function(wrap_pyfunction!(verify_delivery_proof_package_json, m)?)?;
m.add_function(wrap_pyfunction!(
verify_delivery_proof_package_report_json,
m
)?)?;
m.add_function(wrap_pyfunction!(blake3_hash_file_root, m)?)?;
m.add_function(wrap_pyfunction!(batch_blake3_hash, m)?)?;
m.add_function(wrap_pyfunction!(batch_blake3_hash_file_roots, m)?)?;
Ok(())
}