use crate::error::{TrazaeoError, TrazaeoResult};
use crate::utils::Hash;
use crate::verification::VerificationMode;
use pyo3::exceptions::{PyOSError, PyRuntimeError, PyTypeError, PyValueError};
use pyo3::prelude::*;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::path::{Path, PathBuf};
pub(crate) fn py_err(err: TrazaeoError) -> PyErr {
match err {
TrazaeoError::InvalidInput { context, details } => {
PyValueError::new_err(format!("{context}: {details}"))
}
TrazaeoError::Validation { context, details } => {
PyValueError::new_err(format!("{context}: {}", details.join("; ")))
}
TrazaeoError::Io { context, details } => {
PyOSError::new_err(format!("{context}: {details}"))
}
TrazaeoError::Serialization { context, details } => {
PyTypeError::new_err(format!("{context}: {details}"))
}
TrazaeoError::External { context, details } => {
PyRuntimeError::new_err(format!("{context}: {details}"))
}
}
}
pub(crate) fn validate_input_path(path: &str) -> PyResult<PathBuf> {
let requested_path = Path::new(path);
let requested_meta = std::fs::symlink_metadata(requested_path).map_err(|e| {
py_err(TrazaeoError::io(
"validate input path",
format!("failed to stat path: {e}"),
))
})?;
if requested_meta.file_type().is_symlink() {
return Err(py_err(TrazaeoError::invalid_input(
"validate input path",
"symlink paths are not allowed",
)));
}
let canonical = requested_path.canonicalize().map_err(|e| {
py_err(TrazaeoError::io(
"validate input path",
format!("invalid path: {e}"),
))
})?;
let meta = std::fs::symlink_metadata(&canonical).map_err(|e| {
py_err(TrazaeoError::io(
"validate input path",
format!("failed to stat path: {e}"),
))
})?;
if meta.file_type().is_symlink() {
return Err(py_err(TrazaeoError::invalid_input(
"validate input path",
"symlink paths are not allowed",
)));
}
if !meta.is_file() {
return Err(py_err(TrazaeoError::invalid_input(
"validate input path",
"path must point to a regular file",
)));
}
Ok(canonical)
}
pub(crate) fn parse_hash32_hex(value: &str) -> PyResult<Hash> {
let bytes = hex::decode(value).map_err(|e| {
py_err(TrazaeoError::invalid_input(
"parse hash hex",
format!("invalid hex hash: {e}"),
))
})?;
let arr: [u8; 32] = bytes.as_slice().try_into().map_err(|_| {
py_err(TrazaeoError::invalid_input(
"parse hash hex",
"hash must be exactly 32 bytes (64 hex chars)",
))
})?;
Ok(Hash(arr))
}
pub(crate) fn parse_verification_mode(mode: &str) -> PyResult<VerificationMode> {
match mode {
"sampled" => Ok(VerificationMode::Sampled),
"full" => Ok(VerificationMode::Full),
_ => Err(py_err(TrazaeoError::invalid_input(
"parse verification mode",
"mode must be 'sampled' or 'full'",
))),
}
}
pub(crate) fn hash_to_hex(hash: &Hash) -> String {
hex::encode(hash.0)
}
pub(crate) fn from_json<T: DeserializeOwned>(context: &'static str, value: &str) -> PyResult<T> {
serde_json::from_str(value).map_err(|e| {
py_err(TrazaeoError::serialization(
context,
format!("invalid json: {e}"),
))
})
}
pub(crate) fn to_json<T: Serialize>(context: &'static str, value: &T) -> PyResult<String> {
serde_json::to_string(value).map_err(|e| {
py_err(TrazaeoError::serialization(
context,
format!("failed to serialize json: {e}"),
))
})
}
pub(crate) fn into_py_result<T>(result: TrazaeoResult<T>) -> PyResult<T> {
result.map_err(py_err)
}
pub(crate) fn into_py_result_display<T, E: std::fmt::Display>(result: Result<T, E>) -> PyResult<T> {
result.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}