trazaeo 0.5.0

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use super::util::{from_json, hash_to_hex, into_py_result, parse_hash32_hex, to_json};
use crate::python_facade as facade;
#[cfg(feature = "python-proof-log-rpc")]
use crate::python_facade::PublicRpcSolanaProofLogAdaptor;
use pyo3::prelude::*;
use pyo3::types::PyModule;

#[pyclass(name = "S3StorageAdaptor")]
pub(crate) struct PyS3StorageAdaptor {
    adaptor: facade::DeterministicS3StorageAdaptor,
}

#[pymethods]
impl PyS3StorageAdaptor {
    #[new]
    fn new(bucket: String, base_prefix: Option<String>) -> Self {
        Self {
            adaptor: facade::DeterministicS3StorageAdaptor::new(&bucket, base_prefix.as_deref()),
        }
    }

    fn build_key(&self, object_name: &str) -> String {
        self.adaptor.build_key(object_name)
    }

    fn build_uri(&self, key: &str) -> String {
        self.adaptor.build_uri(key)
    }

    fn put_bytes_json(&self, key: &str, bytes: &[u8], content_type: &str) -> PyResult<String> {
        let stored: facade::StoredObject = into_py_result(facade::StorageAdaptor::put_bytes(
            &self.adaptor,
            &facade::S3PutRequest {
                key: key.to_string(),
                bytes: bytes.to_vec(),
                content_type: content_type.to_string(),
            },
        ))?;
        to_json("store bytes receipt", &stored)
    }
}

#[pyclass(name = "SolanaProofLogAdaptor")]
pub(crate) struct PySolanaProofLogAdaptor {
    client: facade::SolanaClient,
}

#[cfg(feature = "python-proof-log-rpc")]
#[pyclass(name = "PublicRpcSolanaProofLogAdaptor")]
pub(crate) struct PyPublicRpcSolanaProofLogAdaptor {
    adaptor: PublicRpcSolanaProofLogAdaptor,
}

impl PySolanaProofLogAdaptor {
    fn commit_publish_proof_inner(
        &self,
        publish_envelope_json: &str,
        attestor_pubkey_hex: &str,
        committed_at: &str,
        committed_unix_seconds: i64,
        prev_entry_hash_hex: Option<&str>,
        attestor_key_ref: Option<&str>,
    ) -> PyResult<String> {
        let envelope: facade::PublishEnvelope = from_json(
            "parse publish envelope for local proof log",
            publish_envelope_json,
        )?;
        let attestor_pubkey = parse_hash32_hex(attestor_pubkey_hex)?.0;
        let adaptor = facade::SolanaProofLogAdaptor::new(
            self.client.clone(),
            attestor_pubkey,
            attestor_key_ref.unwrap_or(attestor_pubkey_hex),
        );
        let prev_entry_hash = prev_entry_hash_hex
            .map(parse_hash32_hex)
            .transpose()?
            .map(|value| value.0)
            .unwrap_or([0u8; 32]);
        let result: facade::ProofLogPublishResult =
            into_py_result(facade::ProofLogAdaptor::log_publish_proof(
                &adaptor,
                &envelope,
                committed_at,
                committed_unix_seconds,
                prev_entry_hash,
            ))?;
        to_json("serialize local proof-log result", &result)
    }

    fn verify_publish_proof_inner(
        &self,
        publish_envelope_json: &str,
        commitment_json: &str,
    ) -> PyResult<()> {
        let envelope: facade::PublishEnvelope = from_json(
            "parse publish envelope for local proof-log verification",
            publish_envelope_json,
        )?;
        let commitment: facade::ProofLogCommitment = from_json(
            "parse proof-log commitment for local proof-log verification",
            commitment_json,
        )?;
        let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "verify");
        into_py_result(facade::ProofLogAdaptor::verify_publish_proof(
            &adaptor,
            &envelope,
            &commitment,
        ))
    }
}

#[pymethods]
impl PySolanaProofLogAdaptor {
    #[new]
    fn new(cluster: String, program_id_value: String) -> Self {
        Self {
            client: facade::init_solana_client(&facade::SolanaConfig {
                cluster,
                program_id: program_id_value,
            }),
        }
    }

    fn cluster(&self) -> String {
        facade::solana_cluster_name(&self.client)
    }

    fn program_id(&self) -> String {
        facade::solana_program_id(&self.client)
    }

    fn chain_root_hex(&self) -> PyResult<String> {
        let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "query");
        Ok(hash_to_hex(&into_py_result(adaptor.chain_root())?))
    }

    #[pyo3(name = "commit_publish_proof")]
    fn commit_publish_proof(
        &self,
        publish_envelope_json: &str,
        attestor_pubkey_hex: &str,
        committed_at: &str,
        committed_unix_seconds: i64,
        prev_entry_hash_hex: Option<&str>,
        attestor_key_ref: Option<&str>,
    ) -> PyResult<String> {
        self.commit_publish_proof_inner(
            publish_envelope_json,
            attestor_pubkey_hex,
            committed_at,
            committed_unix_seconds,
            prev_entry_hash_hex,
            attestor_key_ref,
        )
    }

    #[pyo3(name = "verify_publish_proof")]
    fn verify_publish_proof(
        &self,
        publish_envelope_json: &str,
        commitment_json: &str,
    ) -> PyResult<()> {
        self.verify_publish_proof_inner(publish_envelope_json, commitment_json)
    }

    fn get_transaction_json(&self, signature: &str) -> PyResult<Option<String>> {
        let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "query");
        into_py_result(adaptor.get_transaction(signature))?
            .map(|value| to_json("serialize transaction result", &value))
            .transpose()
    }

    #[pyo3(name = "get_proof_log_account_json")]
    fn get_proof_log_account_json(&self, pda: &str) -> PyResult<Option<String>> {
        let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "query");
        into_py_result(adaptor.get_proof_log_account(pda))?
            .map(|value| to_json("serialize proof-log account", &value))
            .transpose()
    }
}

#[cfg(feature = "python-proof-log-rpc")]
impl PyPublicRpcSolanaProofLogAdaptor {
    fn commit_publish_proof_inner(
        &self,
        publish_envelope_json: &str,
        committed_at: &str,
        committed_unix_seconds: i64,
    ) -> PyResult<String> {
        let envelope: facade::PublishEnvelope = from_json(
            "parse publish envelope for public RPC proof log",
            publish_envelope_json,
        )?;
        let result: facade::ProofLogPublishResult =
            into_py_result(facade::ProofLogAdaptor::log_publish_proof(
                &self.adaptor,
                &envelope,
                committed_at,
                committed_unix_seconds,
                [0u8; 32],
            ))?;
        to_json("serialize public RPC proof-log result", &result)
    }

    fn verify_publish_proof_inner(
        &self,
        publish_envelope_json: &str,
        commitment_json: &str,
    ) -> PyResult<()> {
        let envelope: facade::PublishEnvelope = from_json(
            "parse publish envelope for public RPC proof-log verification",
            publish_envelope_json,
        )?;
        let commitment: facade::ProofLogCommitment = from_json(
            "parse proof-log commitment for public RPC proof-log verification",
            commitment_json,
        )?;
        into_py_result(facade::ProofLogAdaptor::verify_publish_proof(
            &self.adaptor,
            &envelope,
            &commitment,
        ))
    }
}

#[cfg(feature = "python-proof-log-rpc")]
#[pymethods]
impl PyPublicRpcSolanaProofLogAdaptor {
    #[new]
    #[pyo3(signature = (rpc_url, cluster=None, keypair_path=None))]
    fn new(
        rpc_url: String,
        cluster: Option<String>,
        keypair_path: Option<String>,
    ) -> PyResult<Self> {
        let cluster = cluster.as_deref().unwrap_or("solana-devnet");
        let adaptor = match keypair_path.as_deref() {
            Some(path) => into_py_result(PublicRpcSolanaProofLogAdaptor::from_keypair_path(
                &rpc_url, cluster, path,
            ))?,
            None => into_py_result(PublicRpcSolanaProofLogAdaptor::new(&rpc_url, cluster))?,
        };
        Ok(Self { adaptor })
    }

    fn cluster(&self) -> String {
        self.adaptor.config.cluster.clone()
    }

    fn program_id(&self) -> String {
        self.adaptor.config.memo_program_id.clone()
    }

    fn rpc_url(&self) -> String {
        self.adaptor.config.rpc_url.clone()
    }

    fn signer_pubkey(&self) -> String {
        self.adaptor.signer_pubkey()
    }

    fn chain_root_hex(&self) -> Option<String> {
        self.adaptor
            .optional_chain_root()
            .map(|hash| hash_to_hex(&hash))
    }

    #[pyo3(name = "commit_publish_proof")]
    fn commit_publish_proof(
        &self,
        publish_envelope_json: &str,
        committed_at: &str,
        committed_unix_seconds: i64,
    ) -> PyResult<String> {
        self.commit_publish_proof_inner(publish_envelope_json, committed_at, committed_unix_seconds)
    }

    #[pyo3(name = "verify_publish_proof")]
    fn verify_publish_proof(
        &self,
        publish_envelope_json: &str,
        commitment_json: &str,
    ) -> PyResult<()> {
        self.verify_publish_proof_inner(publish_envelope_json, commitment_json)
    }
}

pub(crate) fn register_adaptor_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_class::<PyS3StorageAdaptor>()?;
    m.add_class::<PySolanaProofLogAdaptor>()?;
    #[cfg(feature = "python-proof-log-rpc")]
    {
        m.add_class::<PyPublicRpcSolanaProofLogAdaptor>()?;
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::envelope::{Attestation, PublishEnvelope};
    use serde_json::Value;

    fn publish() -> PublishEnvelope {
        PublishEnvelope {
            schema_version: "1.0.0".to_string(),
            envelope_type: "publish".to_string(),
            issued_at: "2026-01-01T00:00:00Z".to_string(),
            subject_id: "publish-1".to_string(),
            dataset_id: "sst".to_string(),
            dataset_version: "v1".to_string(),
            input_refs: vec!["obj://in".to_string()],
            output_refs: vec!["obj://out".to_string()],
            published_artifacts: vec![crate::checkpoint::CheckpointArtifact {
                artifact_id: "artifact-1".to_string(),
                content_root_hash:
                    "0000000000000000000000000000000000000000000000000000000000000000".to_string(),
                content_descriptor_ref: None,
                content_descriptor_hash: None,
                media_type: "application/vnd+zarr".to_string(),
            }],
            primary_artifact_id: "artifact-1".to_string(),
            checkpoint_manifest_ref: "checkpoint://1".to_string(),
            checkpoint_manifest_hash:
                "0000000000000000000000000000000000000000000000000000000000000000".to_string(),
            checkpoint_id: "checkpoint-1".to_string(),
            checkpoint_log_root_hash:
                "0000000000000000000000000000000000000000000000000000000000000000".to_string(),
            lineage_refs: vec!["capture://1".to_string()],
            verification_policy_id: "verify-default".to_string(),
            attestations: vec![Attestation {
                signer_id: "s".to_string(),
                key_id: "k".to_string(),
                signature: "sig".to_string(),
                signed_at: "2026-01-01T00:00:00Z".to_string(),
            }],
            key_id: "key-1".to_string(),
            stac_refs: vec![],
            reward_context_ref: None,
            reward_context_hash: None,
            provenance_start_mode: "transport_capture".to_string(),
            bootstrap_origin_label: None,
            reward_eligible: false,
        }
    }

    #[test]
    fn python_binding_proof_log_account_json_includes_checkpoint_hash() {
        let adaptor =
            PySolanaProofLogAdaptor::new("solana-testnet".to_string(), "program-1".to_string());
        let envelope_json =
            serde_json::to_string(&publish()).expect("serialize publish envelope for binding");
        let result_json = adaptor
            .commit_publish_proof(
                &envelope_json,
                &hex::encode([7u8; 32]),
                "2026-01-01T00:01:00Z",
                1_700_000_000,
                None,
                Some("attestor-key"),
            )
            .expect("commit publish envelope");
        let result: Value = serde_json::from_str(&result_json).expect("parse proof-log result");
        let pda = result
            .get("proof_log_receipt")
            .and_then(|value| value.get("locator"))
            .and_then(Value::as_str)
            .expect("proof-log account locator");
        let account_json = adaptor
            .get_proof_log_account_json(pda)
            .expect("query proof-log account")
            .expect("proof-log account exists");
        let account: Value = serde_json::from_str(&account_json).expect("parse proof-log account");
        let anchored_checkpoint_hash = account
            .get("anchored_checkpoint_hash")
            .and_then(Value::as_array)
            .expect("anchored checkpoint hash array")
            .iter()
            .map(|value| value.as_u64().expect("checkpoint hash byte") as u8)
            .collect::<Vec<_>>();
        assert_eq!(
            hex::encode(anchored_checkpoint_hash),
            publish().checkpoint_manifest_hash
        );
    }
}