use super::util::{from_json, hash_to_hex, into_py_result, parse_hash32_hex, to_json};
use crate::python_facade as facade;
#[cfg(feature = "python-proof-log-rpc")]
use crate::python_facade::PublicRpcSolanaProofLogAdaptor;
use pyo3::prelude::*;
use pyo3::types::PyModule;
#[pyclass(name = "S3StorageAdaptor")]
pub(crate) struct PyS3StorageAdaptor {
adaptor: facade::DeterministicS3StorageAdaptor,
}
#[pymethods]
impl PyS3StorageAdaptor {
#[new]
fn new(bucket: String, base_prefix: Option<String>) -> Self {
Self {
adaptor: facade::DeterministicS3StorageAdaptor::new(&bucket, base_prefix.as_deref()),
}
}
fn build_key(&self, object_name: &str) -> String {
self.adaptor.build_key(object_name)
}
fn build_uri(&self, key: &str) -> String {
self.adaptor.build_uri(key)
}
fn put_bytes_json(&self, key: &str, bytes: &[u8], content_type: &str) -> PyResult<String> {
let stored: facade::StoredObject = into_py_result(facade::StorageAdaptor::put_bytes(
&self.adaptor,
&facade::S3PutRequest {
key: key.to_string(),
bytes: bytes.to_vec(),
content_type: content_type.to_string(),
},
))?;
to_json("store bytes receipt", &stored)
}
}
#[pyclass(name = "SolanaProofLogAdaptor")]
pub(crate) struct PySolanaProofLogAdaptor {
client: facade::SolanaClient,
}
#[cfg(feature = "python-proof-log-rpc")]
#[pyclass(name = "PublicRpcSolanaProofLogAdaptor")]
pub(crate) struct PyPublicRpcSolanaProofLogAdaptor {
adaptor: PublicRpcSolanaProofLogAdaptor,
}
impl PySolanaProofLogAdaptor {
fn commit_publish_proof_inner(
&self,
publish_envelope_json: &str,
attestor_pubkey_hex: &str,
committed_at: &str,
committed_unix_seconds: i64,
prev_entry_hash_hex: Option<&str>,
attestor_key_ref: Option<&str>,
) -> PyResult<String> {
let envelope: facade::PublishEnvelope = from_json(
"parse publish envelope for local proof log",
publish_envelope_json,
)?;
let attestor_pubkey = parse_hash32_hex(attestor_pubkey_hex)?.0;
let adaptor = facade::SolanaProofLogAdaptor::new(
self.client.clone(),
attestor_pubkey,
attestor_key_ref.unwrap_or(attestor_pubkey_hex),
);
let prev_entry_hash = prev_entry_hash_hex
.map(parse_hash32_hex)
.transpose()?
.map(|value| value.0)
.unwrap_or([0u8; 32]);
let result: facade::ProofLogPublishResult =
into_py_result(facade::ProofLogAdaptor::log_publish_proof(
&adaptor,
&envelope,
committed_at,
committed_unix_seconds,
prev_entry_hash,
))?;
to_json("serialize local proof-log result", &result)
}
fn verify_publish_proof_inner(
&self,
publish_envelope_json: &str,
commitment_json: &str,
) -> PyResult<()> {
let envelope: facade::PublishEnvelope = from_json(
"parse publish envelope for local proof-log verification",
publish_envelope_json,
)?;
let commitment: facade::ProofLogCommitment = from_json(
"parse proof-log commitment for local proof-log verification",
commitment_json,
)?;
let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "verify");
into_py_result(facade::ProofLogAdaptor::verify_publish_proof(
&adaptor,
&envelope,
&commitment,
))
}
}
#[pymethods]
impl PySolanaProofLogAdaptor {
#[new]
fn new(cluster: String, program_id_value: String) -> Self {
Self {
client: facade::init_solana_client(&facade::SolanaConfig {
cluster,
program_id: program_id_value,
}),
}
}
fn cluster(&self) -> String {
facade::solana_cluster_name(&self.client)
}
fn program_id(&self) -> String {
facade::solana_program_id(&self.client)
}
fn chain_root_hex(&self) -> PyResult<String> {
let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "query");
Ok(hash_to_hex(&into_py_result(adaptor.chain_root())?))
}
#[pyo3(name = "commit_publish_proof")]
fn commit_publish_proof(
&self,
publish_envelope_json: &str,
attestor_pubkey_hex: &str,
committed_at: &str,
committed_unix_seconds: i64,
prev_entry_hash_hex: Option<&str>,
attestor_key_ref: Option<&str>,
) -> PyResult<String> {
self.commit_publish_proof_inner(
publish_envelope_json,
attestor_pubkey_hex,
committed_at,
committed_unix_seconds,
prev_entry_hash_hex,
attestor_key_ref,
)
}
#[pyo3(name = "verify_publish_proof")]
fn verify_publish_proof(
&self,
publish_envelope_json: &str,
commitment_json: &str,
) -> PyResult<()> {
self.verify_publish_proof_inner(publish_envelope_json, commitment_json)
}
fn get_transaction_json(&self, signature: &str) -> PyResult<Option<String>> {
let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "query");
into_py_result(adaptor.get_transaction(signature))?
.map(|value| to_json("serialize transaction result", &value))
.transpose()
}
#[pyo3(name = "get_proof_log_account_json")]
fn get_proof_log_account_json(&self, pda: &str) -> PyResult<Option<String>> {
let adaptor = facade::SolanaProofLogAdaptor::new(self.client.clone(), [0u8; 32], "query");
into_py_result(adaptor.get_proof_log_account(pda))?
.map(|value| to_json("serialize proof-log account", &value))
.transpose()
}
}
#[cfg(feature = "python-proof-log-rpc")]
impl PyPublicRpcSolanaProofLogAdaptor {
fn commit_publish_proof_inner(
&self,
publish_envelope_json: &str,
committed_at: &str,
committed_unix_seconds: i64,
) -> PyResult<String> {
let envelope: facade::PublishEnvelope = from_json(
"parse publish envelope for public RPC proof log",
publish_envelope_json,
)?;
let result: facade::ProofLogPublishResult =
into_py_result(facade::ProofLogAdaptor::log_publish_proof(
&self.adaptor,
&envelope,
committed_at,
committed_unix_seconds,
[0u8; 32],
))?;
to_json("serialize public RPC proof-log result", &result)
}
fn verify_publish_proof_inner(
&self,
publish_envelope_json: &str,
commitment_json: &str,
) -> PyResult<()> {
let envelope: facade::PublishEnvelope = from_json(
"parse publish envelope for public RPC proof-log verification",
publish_envelope_json,
)?;
let commitment: facade::ProofLogCommitment = from_json(
"parse proof-log commitment for public RPC proof-log verification",
commitment_json,
)?;
into_py_result(facade::ProofLogAdaptor::verify_publish_proof(
&self.adaptor,
&envelope,
&commitment,
))
}
}
#[cfg(feature = "python-proof-log-rpc")]
#[pymethods]
impl PyPublicRpcSolanaProofLogAdaptor {
#[new]
#[pyo3(signature = (rpc_url, cluster=None, keypair_path=None))]
fn new(
rpc_url: String,
cluster: Option<String>,
keypair_path: Option<String>,
) -> PyResult<Self> {
let cluster = cluster.as_deref().unwrap_or("solana-devnet");
let adaptor = match keypair_path.as_deref() {
Some(path) => into_py_result(PublicRpcSolanaProofLogAdaptor::from_keypair_path(
&rpc_url, cluster, path,
))?,
None => into_py_result(PublicRpcSolanaProofLogAdaptor::new(&rpc_url, cluster))?,
};
Ok(Self { adaptor })
}
fn cluster(&self) -> String {
self.adaptor.config.cluster.clone()
}
fn program_id(&self) -> String {
self.adaptor.config.memo_program_id.clone()
}
fn rpc_url(&self) -> String {
self.adaptor.config.rpc_url.clone()
}
fn signer_pubkey(&self) -> String {
self.adaptor.signer_pubkey()
}
fn chain_root_hex(&self) -> Option<String> {
self.adaptor
.optional_chain_root()
.map(|hash| hash_to_hex(&hash))
}
#[pyo3(name = "commit_publish_proof")]
fn commit_publish_proof(
&self,
publish_envelope_json: &str,
committed_at: &str,
committed_unix_seconds: i64,
) -> PyResult<String> {
self.commit_publish_proof_inner(publish_envelope_json, committed_at, committed_unix_seconds)
}
#[pyo3(name = "verify_publish_proof")]
fn verify_publish_proof(
&self,
publish_envelope_json: &str,
commitment_json: &str,
) -> PyResult<()> {
self.verify_publish_proof_inner(publish_envelope_json, commitment_json)
}
}
pub(crate) fn register_adaptor_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyS3StorageAdaptor>()?;
m.add_class::<PySolanaProofLogAdaptor>()?;
#[cfg(feature = "python-proof-log-rpc")]
{
m.add_class::<PyPublicRpcSolanaProofLogAdaptor>()?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::envelope::{Attestation, PublishEnvelope};
use serde_json::Value;
fn publish() -> PublishEnvelope {
PublishEnvelope {
schema_version: "1.0.0".to_string(),
envelope_type: "publish".to_string(),
issued_at: "2026-01-01T00:00:00Z".to_string(),
subject_id: "publish-1".to_string(),
dataset_id: "sst".to_string(),
dataset_version: "v1".to_string(),
input_refs: vec!["obj://in".to_string()],
output_refs: vec!["obj://out".to_string()],
published_artifacts: vec![crate::checkpoint::CheckpointArtifact {
artifact_id: "artifact-1".to_string(),
content_root_hash:
"0000000000000000000000000000000000000000000000000000000000000000".to_string(),
content_descriptor_ref: None,
content_descriptor_hash: None,
media_type: "application/vnd+zarr".to_string(),
}],
primary_artifact_id: "artifact-1".to_string(),
checkpoint_manifest_ref: "checkpoint://1".to_string(),
checkpoint_manifest_hash:
"0000000000000000000000000000000000000000000000000000000000000000".to_string(),
checkpoint_id: "checkpoint-1".to_string(),
checkpoint_log_root_hash:
"0000000000000000000000000000000000000000000000000000000000000000".to_string(),
lineage_refs: vec!["capture://1".to_string()],
verification_policy_id: "verify-default".to_string(),
attestations: vec![Attestation {
signer_id: "s".to_string(),
key_id: "k".to_string(),
signature: "sig".to_string(),
signed_at: "2026-01-01T00:00:00Z".to_string(),
}],
key_id: "key-1".to_string(),
stac_refs: vec![],
reward_context_ref: None,
reward_context_hash: None,
provenance_start_mode: "transport_capture".to_string(),
bootstrap_origin_label: None,
reward_eligible: false,
}
}
#[test]
fn python_binding_proof_log_account_json_includes_checkpoint_hash() {
let adaptor =
PySolanaProofLogAdaptor::new("solana-testnet".to_string(), "program-1".to_string());
let envelope_json =
serde_json::to_string(&publish()).expect("serialize publish envelope for binding");
let result_json = adaptor
.commit_publish_proof(
&envelope_json,
&hex::encode([7u8; 32]),
"2026-01-01T00:01:00Z",
1_700_000_000,
None,
Some("attestor-key"),
)
.expect("commit publish envelope");
let result: Value = serde_json::from_str(&result_json).expect("parse proof-log result");
let pda = result
.get("proof_log_receipt")
.and_then(|value| value.get("locator"))
.and_then(Value::as_str)
.expect("proof-log account locator");
let account_json = adaptor
.get_proof_log_account_json(pda)
.expect("query proof-log account")
.expect("proof-log account exists");
let account: Value = serde_json::from_str(&account_json).expect("parse proof-log account");
let anchored_checkpoint_hash = account
.get("anchored_checkpoint_hash")
.and_then(Value::as_array)
.expect("anchored checkpoint hash array")
.iter()
.map(|value| value.as_u64().expect("checkpoint hash byte") as u8)
.collect::<Vec<_>>();
assert_eq!(
hex::encode(anchored_checkpoint_hash),
publish().checkpoint_manifest_hash
);
}
}