use super::util::{from_json, into_py_result, parse_verification_mode, to_json};
use crate::python_facade as facade;
use pyo3::prelude::*;
use pyo3::types::PyModule;
use pyo3::wrap_pyfunction;
use serde::de::DeserializeOwned;
fn sign_envelope(
signer_id: &str,
signing_key_hex: &str,
signed_at: &str,
payload: &[u8],
) -> PyResult<facade::Attestation> {
into_py_result(facade::make_attestation(
signer_id,
signing_key_hex,
signed_at,
payload,
))
}
trait SignedEnvelope: serde::Serialize {
fn canonical_attestation_payload_bytes(&self) -> Vec<u8>;
fn replace_attestation(&mut self, attestation: facade::Attestation);
fn validate_for_python(&self, operation: &'static str) -> PyResult<()>;
}
trait SignedEnvelopeInput: DeserializeOwned {
fn set_key_id(&mut self, key_id: String);
}
#[derive(Clone, Copy)]
struct EnvelopeJsonOperations {
parse: &'static str,
serialize: &'static str,
validate: &'static str,
}
#[derive(Clone, Copy)]
struct SigningInputs<'a> {
signer_id: &'a str,
signing_key_hex: &'a str,
signed_at: &'a str,
}
impl SignedEnvelopeInput for facade::CaptureSessionInput {
fn set_key_id(&mut self, key_id: String) {
self.key_id = key_id;
}
}
impl SignedEnvelopeInput for facade::TransformStageInput {
fn set_key_id(&mut self, key_id: String) {
self.key_id = key_id;
}
}
impl SignedEnvelopeInput for facade::PublishInput {
fn set_key_id(&mut self, key_id: String) {
self.key_id = key_id;
}
}
impl SignedEnvelope for facade::CaptureEnvelope {
fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
self.canonical_attestation_payload_bytes()
}
fn replace_attestation(&mut self, attestation: facade::Attestation) {
self.key_id = attestation.key_id.clone();
self.attestations = vec![attestation];
}
fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
self.validate().map_err(|errs| {
super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
})
}
}
impl SignedEnvelope for facade::TransformEnvelope {
fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
self.canonical_attestation_payload_bytes()
}
fn replace_attestation(&mut self, attestation: facade::Attestation) {
self.key_id = attestation.key_id.clone();
self.attestations = vec![attestation];
}
fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
self.validate().map_err(|errs| {
super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
})
}
}
impl SignedEnvelope for facade::PublishEnvelope {
fn canonical_attestation_payload_bytes(&self) -> Vec<u8> {
self.canonical_attestation_payload_bytes()
}
fn replace_attestation(&mut self, attestation: facade::Attestation) {
self.key_id = attestation.key_id.clone();
self.attestations = vec![attestation];
}
fn validate_for_python(&self, operation: &'static str) -> PyResult<()> {
self.validate().map_err(|errs| {
super::util::py_err(crate::error::TrazaeoError::validation(operation, errs))
})
}
}
fn prepare_seed_attestation(
signer_id: &str,
signing_key_hex: &str,
signed_at: &str,
) -> PyResult<(String, facade::Attestation)> {
let seed = sign_envelope(signer_id, signing_key_hex, signed_at, b"")?;
let key_id = seed.key_id.clone();
Ok((
key_id.clone(),
facade::Attestation {
signer_id: signer_id.to_string(),
key_id,
signature: String::new(),
signed_at: signed_at.to_string(),
},
))
}
#[pyfunction]
pub(crate) fn key_id_from_signing_key_hex(signing_key_hex: &str) -> PyResult<String> {
let attestation = sign_envelope(
"key-id-derivation",
signing_key_hex,
"1970-01-01T00:00:00Z",
b"",
)?;
Ok(attestation.key_id)
}
fn finalize_signed_envelope_json<E>(
serialize_operation: &'static str,
validate_operation: &'static str,
mut envelope: E,
signer_id: &str,
signing_key_hex: &str,
signed_at: &str,
) -> PyResult<String>
where
E: SignedEnvelope,
{
let payload = envelope.canonical_attestation_payload_bytes();
let attestation = sign_envelope(signer_id, signing_key_hex, signed_at, &payload)?;
envelope.replace_attestation(attestation);
envelope.validate_for_python(validate_operation)?;
to_json(serialize_operation, &envelope)
}
fn build_signed_envelope_json<I, E>(
operations: EnvelopeJsonOperations,
input_json: &str,
signing: SigningInputs<'_>,
builder: impl FnOnce(I, facade::Attestation) -> E,
) -> PyResult<String>
where
I: SignedEnvelopeInput,
E: SignedEnvelope,
{
let mut input: I = from_json(operations.parse, input_json)?;
let (key_id, unsigned_seed_attestation) = prepare_seed_attestation(
signing.signer_id,
signing.signing_key_hex,
signing.signed_at,
)?;
input.set_key_id(key_id);
let envelope = builder(input, unsigned_seed_attestation);
finalize_signed_envelope_json(
operations.serialize,
operations.validate,
envelope,
signing.signer_id,
signing.signing_key_hex,
signing.signed_at,
)
}
fn verify_publish_envelope_with_policy_json(
publish_envelope_json: &str,
mode: &str,
trust_policy_json: &str,
) -> PyResult<String> {
let envelope: facade::PublishEnvelope =
from_json("parse publish envelope", publish_envelope_json)?;
let mode = parse_verification_mode(mode)?;
let trust: facade::TrustPolicy = from_json("parse trust policy", trust_policy_json)?;
let report = facade::verify_publish_envelope(&envelope, mode, &trust);
to_json("serialize verification report", &report)
}
#[pyfunction]
pub(crate) fn build_capture_envelope_json(
session_input_json: &str,
messages_json: &str,
signer_id: &str,
signing_key_hex: &str,
signed_at: &str,
) -> PyResult<String> {
let messages: Vec<facade::CapturedSegment> =
from_json("parse captured segments", messages_json)?;
build_signed_envelope_json(
EnvelopeJsonOperations {
parse: "parse capture session input",
serialize: "serialize capture envelope",
validate: "validate capture envelope",
},
session_input_json,
SigningInputs {
signer_id,
signing_key_hex,
signed_at,
},
|input, unsigned_seed_attestation| {
facade::build_capture_envelope(&input, &messages, unsigned_seed_attestation)
},
)
}
#[pyfunction]
pub(crate) fn build_transform_envelope_json(
transform_input_json: &str,
signer_id: &str,
signing_key_hex: &str,
signed_at: &str,
) -> PyResult<String> {
build_signed_envelope_json(
EnvelopeJsonOperations {
parse: "parse transform input",
serialize: "serialize transform envelope",
validate: "validate transform envelope",
},
transform_input_json,
SigningInputs {
signer_id,
signing_key_hex,
signed_at,
},
|input, unsigned_seed_attestation| {
facade::build_transform_envelope(&input, unsigned_seed_attestation)
},
)
}
#[pyfunction]
pub(crate) fn build_publish_envelope_json(
publish_input_json: &str,
signer_id: &str,
signing_key_hex: &str,
signed_at: &str,
) -> PyResult<String> {
build_signed_envelope_json(
EnvelopeJsonOperations {
parse: "parse publish input",
serialize: "serialize publish envelope",
validate: "validate publish envelope",
},
publish_input_json,
SigningInputs {
signer_id,
signing_key_hex,
signed_at,
},
|input, unsigned_seed_attestation| {
facade::build_publish_envelope(&input, unsigned_seed_attestation)
},
)
}
#[pyfunction]
pub(crate) fn verify_publish_envelope_json(
publish_envelope_json: &str,
mode: &str,
trust_policy_json: &str,
) -> PyResult<String> {
verify_publish_envelope_with_policy_json(publish_envelope_json, mode, trust_policy_json)
}
pub(crate) fn register_envelope_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(key_id_from_signing_key_hex, m)?)?;
m.add_function(wrap_pyfunction!(build_capture_envelope_json, m)?)?;
m.add_function(wrap_pyfunction!(build_transform_envelope_json, m)?)?;
m.add_function(wrap_pyfunction!(build_publish_envelope_json, m)?)?;
m.add_function(wrap_pyfunction!(verify_publish_envelope_json, m)?)?;
Ok(())
}