#![allow(clippy::useless_conversion)]
use crate::approval::{
ApprovalMetadata as RustApprovalMetadata, ApprovalPayload as RustApprovalPayload,
SignedApproval as RustSignedApproval,
};
use crate::constraints::{
All, Any, CelConstraint, Cidr, Constraint, ConstraintValue, Contains, Exact, Not, NotOneOf,
OneOf, Pattern, Range, RegexConstraint, Subpath, Subset, UrlPattern, UrlSafe, Wildcard,
};
use crate::crypto::{
PublicKey as RustPublicKey, Signature as RustSignature, SigningKey as RustSigningKey,
};
use crate::diff::{
ChangeType as RustChangeType, ClearanceDiff as RustClearanceDiff,
ConstraintDiff as RustConstraintDiff, DelegationDiff as RustDelegationDiff,
DelegationReceipt as RustDelegationReceipt, DepthDiff as RustDepthDiff,
ToolsDiff as RustToolsDiff, TtlDiff as RustTtlDiff,
};
use crate::mcp::{CompiledMcpConfig, McpConfig};
use crate::planes::{
Authorizer as RustAuthorizer, ChainStep as RustChainStep,
ChainVerificationResult as RustChainVerificationResult,
};
use crate::warrant::{
Clearance, OwnedAttenuationBuilder, OwnedIssuanceBuilder, Warrant as RustWarrant, WarrantType,
};
use crate::wire;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PySequence, PyTuple};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
fn to_py_err(e: crate::error::Error) -> PyErr {
Python::with_gil(|py| {
let exceptions = match py.import("tenuo.exceptions") {
Ok(m) => m,
Err(e) => return e,
};
let (exc_name, args) = match &e {
crate::error::Error::SignatureInvalid(m) => {
("SignatureInvalid", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::MissingSignature(m) => {
("MissingSignature", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::CryptoError(m) => ("CryptoError", PyTuple::new(py, [m.as_str()])),
crate::error::Error::WarrantRevoked(id) => {
("RevokedError", PyTuple::new(py, [id.as_str()]))
}
crate::error::Error::WarrantExpired(t) => {
(
"ExpiredError",
PyTuple::new(py, ["unknown", t.to_rfc3339().as_str()]),
)
}
crate::error::Error::DepthExceeded(d, m) => {
("DepthExceeded", PyTuple::new(py, [*d, *m]))
}
crate::error::Error::InvalidWarrantId(m) => {
("InvalidWarrantId", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::InvalidTtl(m) => ("InvalidTtl", PyTuple::new(py, [m.as_str()])),
crate::error::Error::ConstraintDepthExceeded { depth, max } => {
("ConstraintDepthExceeded", PyTuple::new(py, [*depth, *max]))
}
crate::error::Error::PayloadTooLarge { size, max } => {
("PayloadTooLarge", PyTuple::new(py, [*size, *max]))
}
crate::error::Error::ParentRequired => ("ParentRequired", Ok(PyTuple::empty(py))),
crate::error::Error::ToolMismatch { parent, child } => (
"ToolMismatch",
PyTuple::new(py, [parent.as_str(), child.as_str()]),
),
crate::error::Error::MonotonicityViolation(m) => {
("MonotonicityError", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::IncompatibleConstraintTypes {
parent_type,
child_type,
} => (
"IncompatibleConstraintTypes",
PyTuple::new(py, [parent_type.as_str(), child_type.as_str()]),
),
crate::error::Error::WildcardExpansion { parent_type } => (
"WildcardExpansion",
PyTuple::new(py, [parent_type.as_str()]),
),
crate::error::Error::EmptyResultSet { parent_type, count } => (
"EmptyResultSet",
PyTuple::new(py, [parent_type.as_str(), &count.to_string()]),
), crate::error::Error::ExclusionRemoved { value } => {
("ExclusionRemoved", PyTuple::new(py, [value.as_str()]))
}
crate::error::Error::ValueNotInParentSet { value } => {
("ValueNotInParentSet", PyTuple::new(py, [value.as_str()]))
}
crate::error::Error::RangeExpanded {
bound,
parent_value,
child_value,
} => (
"RangeExpanded",
PyTuple::new(
py,
[
bound.as_str(),
&parent_value.to_string(),
&child_value.to_string(),
],
),
),
crate::error::Error::PatternExpanded { parent, child } => (
"PatternExpanded",
PyTuple::new(py, [parent.as_str(), child.as_str()]),
),
crate::error::Error::RequiredValueRemoved { value } => {
("RequiredValueRemoved", PyTuple::new(py, [value.as_str()]))
}
crate::error::Error::ExactValueMismatch { parent, child } => (
"ExactValueMismatch",
PyTuple::new(py, [parent.as_str(), child.as_str()]),
),
crate::error::Error::IssuedInFuture => (
"ValidationError",
PyTuple::new(py, ["Warrant issued in the future (check system clock)"]),
),
crate::error::Error::ConstraintNotSatisfied { field, reason } => (
"ConstraintViolation",
PyTuple::new(py, [field.as_str(), reason.as_str()]),
),
crate::error::Error::InvalidPattern(m) => {
("InvalidPattern", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::InvalidRange(m) => {
("InvalidRange", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::InvalidRegex(m) => {
("InvalidRegex", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::CelError(m) => ("CelError", PyTuple::new(py, [m.as_str()])),
crate::error::Error::SerializationError(m) => {
("SerializationError", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::DeserializationError(m) => {
("DeserializationError", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::UnsupportedVersion(v) => {
("UnsupportedVersion", PyTuple::new(py, [*v]))
}
crate::error::Error::MissingField(m) => {
("MissingField", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::ChainVerificationFailed(m) => {
("ChainError", PyTuple::new(py, [m.as_str()]))
} crate::error::Error::Validation(m) => {
("ValidationError", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::Unauthorized(m) => {
("Unauthorized", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::ApprovalExpired {
approved_at,
expired_at,
} => (
"ApprovalExpired",
PyTuple::new(
py,
[
approved_at.to_rfc3339().as_str(),
expired_at.to_rfc3339().as_str(),
],
),
),
crate::error::Error::InsufficientApprovals {
required,
received,
ref detail,
} => {
let detail_str = detail.as_deref().unwrap_or("");
let elements: [pyo3::PyObject; 3] = [
required.into_py(py),
received.into_py(py),
detail_str.into_py(py),
];
("InsufficientApprovals", PyTuple::new(py, elements))
}
crate::error::Error::InvalidApproval(m) => {
("InvalidApproval", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::ApprovalRequired { ref tool, ref request } => {
let request_hash_hex = hex::encode(request.request_hash);
let request_id_hex = hex::encode(request.request_id);
(
"ApprovalGateTriggered",
PyTuple::new(
py,
[
tool.as_str(),
request_id_hex.as_str(),
request_hash_hex.as_str(),
&request.min_approvals.to_string(),
],
),
)
}
crate::error::Error::UnknownProvider(m) => {
("UnknownProvider", PyTuple::new(py, [m.as_str()]))
}
crate::error::Error::ClearanceLevelExceeded { requested, limit } => (
"ClearanceLevelExceeded",
PyTuple::new(py, [requested.as_str(), limit.as_str()]),
),
crate::error::Error::UnauthorizedToolIssuance { tool, allowed } => (
"UnauthorizedToolIssuance",
PyTuple::new(
py,
[
tool.as_str(),
&format!("{:?}", allowed), ],
),
),
crate::error::Error::SelfIssuanceProhibited { reason } => (
"SelfIssuanceProhibited",
PyTuple::new(py, [reason.as_str()]),
),
crate::error::Error::IssueDepthExceeded { depth, max } => {
("IssueDepthExceeded", PyTuple::new(py, [*depth, *max]))
}
crate::error::Error::InvalidWarrantType { message } => {
("InvalidWarrantType", PyTuple::new(py, [message.as_str()]))
}
crate::error::Error::IssuerChainTooLong { length, max } => {
("IssuerChainTooLong", PyTuple::new(py, [*length, *max]))
}
crate::error::Error::RangeInclusivityExpanded {
bound,
value,
parent_inclusive: _,
child_inclusive: _,
} => (
"RangeExpanded",
PyTuple::new(
py,
[
bound.as_str(),
&format!("{} (inclusivity)", value),
&format!("{} (inclusive)", value),
],
),
),
crate::error::Error::ValueNotInRange { value, min, max } => (
"RangeExpanded",
PyTuple::new(
py,
["value", &format!("{:?}-{:?}", min, max), &value.to_string()],
),
),
crate::error::Error::InvalidCidr { cidr, reason } => (
"ValidationError",
PyTuple::new(py, [&format!("Invalid CIDR '{}': {}", cidr, reason)]),
),
crate::error::Error::InvalidIpAddress { ip, reason } => (
"ValidationError",
PyTuple::new(py, [&format!("Invalid IP address '{}': {}", ip, reason)]),
),
crate::error::Error::IpNotInCidr { ip, cidr } => (
"ConstraintViolation",
PyTuple::new(
py,
["source_ip", &format!("IP '{}' not in CIDR '{}'", ip, cidr)],
),
),
crate::error::Error::CidrNotSubnet { parent, child } => (
"MonotonicityError",
PyTuple::new(
py,
[&format!("CIDR '{}' is not a subnet of '{}'", child, parent)],
),
),
crate::error::Error::InvalidUrl { url, reason } => (
"ValidationError",
PyTuple::new(py, [&format!("Invalid URL '{}': {}", url, reason)]),
),
crate::error::Error::UrlSchemeExpanded { parent, child } => (
"MonotonicityError",
PyTuple::new(
py,
[&format!(
"URL scheme '{}' not allowed by parent scheme '{}'",
child, parent
)],
),
),
crate::error::Error::UrlHostExpanded { parent, child } => (
"MonotonicityError",
PyTuple::new(
py,
[&format!(
"URL host '{}' not allowed by parent host '{}'",
child, parent
)],
),
),
crate::error::Error::UrlPortExpanded { parent, child } => (
"MonotonicityError",
PyTuple::new(
py,
[&format!(
"URL port '{:?}' not allowed by parent port '{:?}'",
child, parent
)],
),
),
crate::error::Error::UrlPathExpanded { parent, child } => (
"MonotonicityError",
PyTuple::new(
py,
[&format!(
"URL path '{}' not allowed by parent path '{}'",
child, parent
)],
),
),
crate::error::Error::UrlMismatch { reason } => (
"ConstraintViolation",
PyTuple::new(py, ["url", reason.as_str()]),
),
crate::error::Error::DelegationAuthorityError { expected, actual } => (
"DelegationAuthorityError",
PyTuple::new(py, [expected.as_str(), actual.as_str()]),
),
crate::error::Error::InsufficientClearance {
tool,
required,
actual,
} => (
"Unauthorized",
PyTuple::new(
py,
[&format!(
"insufficient clearance for tool '{}': requires {}, has {}",
tool, required, actual
)],
),
),
crate::error::Error::ConfigurationError(msg) => (
"ConfigurationError",
PyTuple::new(py, [msg.as_str()]),
),
crate::error::Error::FeatureNotEnabled { feature } => (
"RuntimeError",
PyTuple::new(
py,
[&format!(
"{} requires the '{}' feature. Enable with: tenuo = {{ features = [\"{}\"] }}",
feature, feature, feature
)],
),
),
crate::error::Error::PathNotContained { path, root } => (
"ConstraintViolation",
PyTuple::new(
py,
[
"path",
&format!("path '{}' not contained in root '{}'", path, root),
],
),
),
crate::error::Error::InvalidPath { path, reason } => {
return PyValueError::new_err(format!("invalid path '{}': {}", path, reason));
}
crate::error::Error::UrlNotSafe { url, reason } => (
"ConstraintViolation",
PyTuple::new(
py,
[
"url",
&format!("URL '{}' blocked: {}", url, reason),
],
),
),
};
let args = match args {
Ok(a) => a,
Err(e) => {
return PyRuntimeError::new_err(format!("Failed to create args tuple: {}", e))
}
};
match exceptions.getattr(exc_name) {
Ok(cls) => {
PyErr::from_value(cls.call1(args).unwrap_or_else(|e| {
PyRuntimeError::new_err(e.to_string())
.value(py)
.as_any()
.clone()
}))
}
Err(e) => PyRuntimeError::new_err(e.to_string()),
}
})
}
fn config_err_to_py(e: crate::gateway_config::ConfigError) -> PyErr {
Python::with_gil(|py| match py.import("tenuo.exceptions") {
Ok(m) => match m.getattr("ConfigurationError") {
Ok(cls) => PyErr::from_value(cls.call1((e.to_string(),)).unwrap_or_else(|_| {
PyValueError::new_err(e.to_string())
.value(py)
.as_any()
.clone()
})),
Err(_) => PyValueError::new_err(e.to_string()),
},
Err(_) => PyValueError::new_err(e.to_string()),
})
}
#[pyclass(name = "Pattern")]
#[derive(Clone)]
pub struct PyPattern {
inner: Pattern,
}
#[pymethods]
impl PyPattern {
#[new]
fn new(pattern: &str) -> PyResult<Self> {
let inner = Pattern::new(pattern).map_err(to_py_err)?;
Ok(Self { inner })
}
fn validate_attenuation(&self, child: &PyPattern) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn matches(&self, value: &str) -> PyResult<bool> {
let cv = ConstraintValue::String(value.to_string());
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("Pattern('{}')", self.inner.pattern)
}
#[getter]
fn pattern(&self) -> String {
self.inner.pattern.clone()
}
}
#[pyclass(name = "Exact")]
#[derive(Clone)]
pub struct PyExact {
inner: Exact,
}
#[pymethods]
impl PyExact {
#[new]
fn new(value: &str) -> Self {
Self {
inner: Exact::new(value),
}
}
fn matches(&self, value: &str) -> bool {
self.inner.value.as_str() == Some(value)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("Exact('{}')", self.inner.value)
}
#[getter]
fn value(&self) -> PyResult<PyObject> {
Python::with_gil(|py| constraint_value_to_py(py, &self.inner.value))
}
}
#[pyclass(name = "OneOf")]
#[derive(Clone)]
pub struct PyOneOf {
inner: OneOf,
}
#[pymethods]
impl PyOneOf {
#[new]
fn new(values: Vec<String>) -> Self {
Self {
inner: OneOf::new(values),
}
}
fn validate_attenuation(&self, child: &PyOneOf) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn contains(&self, value: &str) -> bool {
let cv = ConstraintValue::String(value.to_string());
self.inner.contains(&cv)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("OneOf({:?})", self.inner.values)
}
#[getter]
fn values(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let list = pyo3::types::PyList::empty(py);
for v in &self.inner.values {
list.append(constraint_value_to_py(py, v)?)?;
}
Ok(list.into())
})
}
}
#[pyclass(name = "NotOneOf")]
#[derive(Clone)]
pub struct PyNotOneOf {
inner: NotOneOf,
}
#[pymethods]
impl PyNotOneOf {
#[new]
fn new(values: Vec<String>) -> Self {
Self {
inner: NotOneOf::new(values),
}
}
fn validate_attenuation(&self, child: &PyNotOneOf) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn allows(&self, value: &str) -> bool {
let cv = ConstraintValue::String(value.to_string());
!self.inner.excluded.contains(&cv)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
#[getter]
fn excluded(&self) -> PyResult<Vec<PyObject>> {
Python::with_gil(|py| {
self.inner
.excluded
.iter()
.map(|v| constraint_value_to_py(py, v))
.collect()
})
}
fn __repr__(&self) -> String {
format!("NotOneOf({:?})", self.inner.excluded)
}
}
#[pyclass(name = "Contains")]
#[derive(Clone)]
pub struct PyContains {
inner: Contains,
}
#[pymethods]
impl PyContains {
#[new]
fn new(values: Vec<PyObject>) -> PyResult<Self> {
let rust_values = Python::with_gil(|py| -> PyResult<Vec<ConstraintValue>> {
let mut vec = Vec::new();
for obj in values {
let bound = obj.into_bound(py);
vec.push(py_to_constraint_value(&bound)?);
}
Ok(vec)
})?;
Ok(Self {
inner: Contains::new(rust_values),
})
}
fn validate_attenuation(&self, child: &PyContains) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn matches(&self, value: Vec<PyObject>) -> PyResult<bool> {
let rust_values = Python::with_gil(|py| -> PyResult<Vec<ConstraintValue>> {
let mut vec = Vec::new();
for obj in value {
let bound = obj.into_bound(py);
vec.push(py_to_constraint_value(&bound)?);
}
Ok(vec)
})?;
let cv = ConstraintValue::List(rust_values);
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
#[getter]
fn required(&self) -> PyResult<Vec<PyObject>> {
Python::with_gil(|py| {
self.inner
.required
.iter()
.map(|v| constraint_value_to_py(py, v))
.collect()
})
}
fn __repr__(&self) -> String {
format!("Contains({:?})", self.inner.required)
}
}
#[pyclass(name = "Subset")]
#[derive(Clone)]
pub struct PySubset {
inner: Subset,
}
#[pymethods]
impl PySubset {
#[new]
fn new(values: Vec<PyObject>) -> PyResult<Self> {
let rust_values = Python::with_gil(|py| -> PyResult<Vec<ConstraintValue>> {
let mut vec = Vec::new();
for obj in values {
let bound = obj.into_bound(py);
vec.push(py_to_constraint_value(&bound)?);
}
Ok(vec)
})?;
Ok(Self {
inner: Subset::new(rust_values),
})
}
fn validate_attenuation(&self, child: &PySubset) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn matches(&self, value: Vec<PyObject>) -> PyResult<bool> {
let rust_values = Python::with_gil(|py| -> PyResult<Vec<ConstraintValue>> {
let mut vec = Vec::new();
for obj in value {
let bound = obj.into_bound(py);
vec.push(py_to_constraint_value(&bound)?);
}
Ok(vec)
})?;
let cv = ConstraintValue::List(rust_values);
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
#[getter]
fn allowed(&self) -> PyResult<Vec<PyObject>> {
Python::with_gil(|py| {
self.inner
.allowed
.iter()
.map(|v| constraint_value_to_py(py, v))
.collect()
})
}
fn __repr__(&self) -> String {
format!("Subset({:?})", self.inner.allowed)
}
}
#[pyclass(name = "All")]
#[derive(Clone)]
pub struct PyAll {
inner: All,
}
#[pymethods]
impl PyAll {
#[new]
fn new(constraints: Vec<PyObject>) -> PyResult<Self> {
let rust_constraints = Python::with_gil(|py| -> PyResult<Vec<Constraint>> {
let mut vec = Vec::new();
for obj in constraints {
let bound = obj.into_bound(py);
vec.push(py_to_constraint(&bound)?);
}
Ok(vec)
})?;
Ok(Self {
inner: All::new(rust_constraints),
})
}
fn validate_attenuation(&self, child: &PyAll) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn matches(&self, value: &str) -> PyResult<bool> {
let cv = ConstraintValue::String(value.to_string());
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
"All(...)".to_string()
}
}
#[pyclass(name = "AnyOf")]
#[derive(Clone)]
pub struct PyAnyOf {
inner: Any,
}
#[pymethods]
impl PyAnyOf {
#[new]
fn new(constraints: Vec<PyObject>) -> PyResult<Self> {
let rust_constraints = Python::with_gil(|py| -> PyResult<Vec<Constraint>> {
let mut vec = Vec::new();
for obj in constraints {
let bound = obj.into_bound(py);
vec.push(py_to_constraint(&bound)?);
}
Ok(vec)
})?;
Ok(Self {
inner: Any::new(rust_constraints),
})
}
fn matches(&self, value: &str) -> PyResult<bool> {
let cv = ConstraintValue::String(value.to_string());
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
"AnyOf(...)".to_string()
}
}
#[pyclass(name = "Not")]
#[derive(Clone)]
pub struct PyNot {
inner: Not,
}
#[pymethods]
impl PyNot {
#[new]
fn new(constraint: PyObject) -> PyResult<Self> {
let rust_constraint = Python::with_gil(|py| -> PyResult<Constraint> {
let bound = constraint.into_bound(py);
py_to_constraint(&bound)
})?;
Ok(Self {
inner: Not::new(rust_constraint),
})
}
fn matches(&self, value: &str) -> PyResult<bool> {
let cv = ConstraintValue::String(value.to_string());
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
"Not(...)".to_string()
}
}
#[pyclass(name = "Range")]
#[derive(Clone)]
pub struct PyRange {
inner: Range,
}
#[pymethods]
impl PyRange {
#[new]
#[pyo3(signature = (min=None, max=None))]
fn new(min: Option<f64>, max: Option<f64>) -> PyResult<Self> {
Ok(Self {
inner: Range::new(min, max).map_err(to_py_err)?,
})
}
#[staticmethod]
fn max_value(max: f64) -> PyResult<Self> {
Ok(Self {
inner: Range::max(max).map_err(to_py_err)?,
})
}
#[staticmethod]
fn min_value(min: f64) -> PyResult<Self> {
Ok(Self {
inner: Range::min(min).map_err(to_py_err)?,
})
}
fn validate_attenuation(&self, child: &PyRange) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn contains(&self, value: f64) -> bool {
self.inner.contains_value(value)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("Range(min={:?}, max={:?})", self.inner.min, self.inner.max)
}
#[getter]
fn min(&self) -> Option<f64> {
self.inner.min
}
#[getter]
fn max(&self) -> Option<f64> {
self.inner.max
}
}
#[pyclass(name = "Cidr")]
#[derive(Clone)]
pub struct PyCidr {
inner: Cidr,
}
#[pymethods]
impl PyCidr {
#[new]
fn new(cidr: &str) -> PyResult<Self> {
Ok(Self {
inner: Cidr::new(cidr).map_err(to_py_err)?,
})
}
fn contains(&self, ip: &str) -> PyResult<bool> {
self.inner.contains_ip(ip).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn validate_attenuation(&self, child: &PyCidr) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("Cidr('{}')", self.inner.cidr_string)
}
fn __str__(&self) -> String {
self.inner.cidr_string.clone()
}
#[getter]
fn network(&self) -> String {
self.inner.cidr_string.clone()
}
}
#[pyclass(name = "UrlPattern")]
#[derive(Clone)]
pub struct PyUrlPattern {
inner: UrlPattern,
}
#[pymethods]
impl PyUrlPattern {
#[new]
fn new(pattern: &str) -> PyResult<Self> {
Ok(Self {
inner: UrlPattern::new(pattern).map_err(to_py_err)?,
})
}
fn matches(&self, url: &str) -> PyResult<bool> {
self.inner.matches_url(url).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn validate_attenuation(&self, child: &PyUrlPattern) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("UrlPattern('{}')", self.inner.pattern)
}
fn __str__(&self) -> String {
self.inner.pattern.clone()
}
#[getter]
fn pattern(&self) -> String {
self.inner.pattern.clone()
}
#[getter]
fn schemes(&self) -> Vec<String> {
self.inner.schemes.clone()
}
#[getter]
fn host_pattern(&self) -> Option<String> {
self.inner.host_pattern.clone()
}
#[getter]
fn port(&self) -> Option<u16> {
self.inner.port
}
#[getter]
fn path_pattern(&self) -> Option<String> {
self.inner.path_pattern.clone()
}
}
#[pyclass(name = "CEL")]
#[derive(Clone)]
pub struct PyCel {
inner: CelConstraint,
}
#[pymethods]
impl PyCel {
#[new]
fn new(expression: &str) -> Self {
Self {
inner: CelConstraint::new(expression),
}
}
fn validate_attenuation(&self, child: &PyCel) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn matches(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
self.matches(value)
}
fn __repr__(&self) -> String {
format!("CEL('{}')", self.inner.expression)
}
#[getter]
fn expression(&self) -> String {
self.inner.expression.clone()
}
}
#[pyclass(name = "Regex")]
#[derive(Clone)]
pub struct PyRegex {
inner: RegexConstraint,
}
#[pymethods]
impl PyRegex {
#[new]
fn new(pattern: &str) -> PyResult<Self> {
let inner = RegexConstraint::new(pattern).map_err(to_py_err)?;
Ok(Self { inner })
}
fn validate_attenuation(&self, child: &PyRegex) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
fn matches(&self, value: &str) -> PyResult<bool> {
let cv = ConstraintValue::String(value.to_string());
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("Regex('{}')", self.inner.pattern)
}
#[getter]
fn pattern(&self) -> String {
self.inner.pattern.clone()
}
}
#[pyclass(name = "Wildcard")]
#[derive(Clone)]
pub struct PyWildcard {
inner: Wildcard,
}
#[pymethods]
impl PyWildcard {
#[new]
fn new() -> Self {
Self { inner: Wildcard }
}
#[pyo3(signature = (_value=None))]
fn matches(&self, _value: Option<&str>) -> bool {
true
}
fn validate_attenuation(&self, child: &Bound<'_, PyAny>) -> PyResult<()> {
let child_constraint = py_to_constraint(child)?;
Constraint::Wildcard(self.inner.clone())
.validate_attenuation(&child_constraint)
.map_err(to_py_err)
}
fn satisfies(&self, _value: &Bound<'_, PyAny>) -> PyResult<bool> {
Ok(true)
}
fn __repr__(&self) -> String {
"Wildcard()".to_string()
}
}
#[pyclass(name = "Subpath")]
#[derive(Clone)]
pub struct PySubpath {
inner: Subpath,
}
#[pymethods]
impl PySubpath {
#[new]
#[pyo3(signature = (root, case_sensitive=true, allow_equal=true))]
fn new(root: &str, case_sensitive: bool, allow_equal: bool) -> PyResult<Self> {
Ok(Self {
inner: Subpath::with_options(root, case_sensitive, allow_equal).map_err(to_py_err)?,
})
}
fn contains(&self, path: &str) -> PyResult<bool> {
self.inner.contains_path(path).map_err(to_py_err)
}
fn matches(&self, path: &str) -> PyResult<bool> {
self.contains(path)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
if self.inner.case_sensitive && self.inner.allow_equal {
format!("Subpath('{}')", self.inner.root)
} else {
format!(
"Subpath('{}', case_sensitive={}, allow_equal={})",
self.inner.root,
if self.inner.case_sensitive {
"True"
} else {
"False"
},
if self.inner.allow_equal {
"True"
} else {
"False"
}
)
}
}
fn __str__(&self) -> String {
format!("Subpath('{}')", self.inner.root)
}
#[getter]
fn root(&self) -> String {
self.inner.root.clone()
}
#[getter]
fn case_sensitive(&self) -> bool {
self.inner.case_sensitive
}
#[getter]
fn allow_equal(&self) -> bool {
self.inner.allow_equal
}
fn validate_attenuation(&self, child: &PySubpath) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
}
#[pyclass(name = "UrlSafe")]
#[derive(Clone)]
pub struct PyUrlSafe {
inner: UrlSafe,
}
#[pymethods]
impl PyUrlSafe {
#[new]
#[pyo3(signature = (
allow_schemes=None,
allow_domains=None,
deny_domains=None,
allow_ports=None,
block_private=true,
block_loopback=true,
block_metadata=true,
block_reserved=true,
block_internal_tlds=false
))]
#[allow(clippy::too_many_arguments)]
fn new(
allow_schemes: Option<Vec<String>>,
allow_domains: Option<Vec<String>>,
deny_domains: Option<Vec<String>>,
allow_ports: Option<Vec<u16>>,
block_private: bool,
block_loopback: bool,
block_metadata: bool,
block_reserved: bool,
block_internal_tlds: bool,
) -> Self {
Self {
inner: UrlSafe {
schemes: allow_schemes.unwrap_or_else(|| vec!["http".into(), "https".into()]),
allow_domains,
deny_domains,
allow_ports,
block_private,
block_loopback,
block_metadata,
block_reserved,
block_internal_tlds,
},
}
}
fn is_safe(&self, url: &str) -> PyResult<bool> {
self.inner.is_safe(url).map_err(to_py_err)
}
fn matches(&self, url: &str) -> PyResult<bool> {
self.is_safe(url)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
let mut opts = Vec::new();
let default_schemes = vec!["http".to_string(), "https".to_string()];
if self.inner.schemes != default_schemes {
opts.push(format!("allow_schemes={:?}", self.inner.schemes));
}
if let Some(ref domains) = self.inner.allow_domains {
opts.push(format!("allow_domains={:?}", domains));
}
if let Some(ref domains) = self.inner.deny_domains {
opts.push(format!("deny_domains={:?}", domains));
}
if let Some(ref ports) = self.inner.allow_ports {
opts.push(format!("allow_ports={:?}", ports));
}
if !self.inner.block_private {
opts.push("block_private=False".to_string());
}
if !self.inner.block_loopback {
opts.push("block_loopback=False".to_string());
}
if !self.inner.block_metadata {
opts.push("block_metadata=False".to_string());
}
if !self.inner.block_reserved {
opts.push("block_reserved=False".to_string());
}
if self.inner.block_internal_tlds {
opts.push("block_internal_tlds=True".to_string());
}
if opts.is_empty() {
"UrlSafe()".to_string()
} else {
format!("UrlSafe({})", opts.join(", "))
}
}
fn __str__(&self) -> String {
self.__repr__()
}
#[getter]
fn schemes(&self) -> Vec<String> {
self.inner.schemes.clone()
}
#[getter]
fn allow_domains(&self) -> Option<Vec<String>> {
self.inner.allow_domains.clone()
}
#[getter]
fn deny_domains(&self) -> Option<Vec<String>> {
self.inner.deny_domains.clone()
}
#[getter]
fn allow_ports(&self) -> Option<Vec<u16>> {
self.inner.allow_ports.clone()
}
#[getter]
fn block_private(&self) -> bool {
self.inner.block_private
}
#[getter]
fn block_loopback(&self) -> bool {
self.inner.block_loopback
}
#[getter]
fn block_metadata(&self) -> bool {
self.inner.block_metadata
}
#[getter]
fn block_reserved(&self) -> bool {
self.inner.block_reserved
}
#[getter]
fn block_internal_tlds(&self) -> bool {
self.inner.block_internal_tlds
}
fn validate_attenuation(&self, child: &PyUrlSafe) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
}
#[pyclass(name = "Shlex")]
#[derive(Clone)]
pub struct PyShlex {
pub(crate) inner: crate::constraints::Shlex,
}
#[pymethods]
impl PyShlex {
#[new]
#[pyo3(signature = (allow))]
fn new(allow: Vec<String>) -> PyResult<Self> {
if allow.is_empty() {
return Err(pyo3::exceptions::PyValueError::new_err(
"Shlex requires at least one allowed binary",
));
}
Ok(Self {
inner: crate::constraints::Shlex { allow },
})
}
fn matches(&self, command: &str) -> PyResult<bool> {
let cv = ConstraintValue::String(command.to_string());
self.inner.matches(&cv).map_err(to_py_err)
}
fn satisfies(&self, value: &Bound<'_, PyAny>) -> PyResult<bool> {
let cv = py_to_constraint_value(value)?;
self.inner.matches(&cv).map_err(to_py_err)
}
fn __repr__(&self) -> String {
format!("Shlex(allow={:?})", self.inner.allow)
}
fn __str__(&self) -> String {
self.__repr__()
}
#[getter]
fn allow(&self) -> Vec<String> {
self.inner.allow.clone()
}
fn validate_attenuation(&self, child: &PyShlex) -> PyResult<()> {
self.inner
.validate_attenuation(&child.inner)
.map_err(to_py_err)
}
}
#[pyclass(name = "SigningKey")]
pub struct PySigningKey {
pub(crate) inner: RustSigningKey,
}
#[pymethods]
impl PySigningKey {
#[new]
fn new() -> Self {
Self {
inner: RustSigningKey::generate(),
}
}
#[staticmethod]
fn generate() -> Self {
Self {
inner: RustSigningKey::generate(),
}
}
#[staticmethod]
fn from_bytes(bytes: &[u8]) -> PyResult<Self> {
let arr: [u8; 32] = bytes
.try_into()
.map_err(|_| PyValueError::new_err("secret key must be exactly 32 bytes"))?;
Ok(Self {
inner: RustSigningKey::from_bytes(&arr),
})
}
fn public_key_bytes(&self) -> Vec<u8> {
self.inner.public_key().to_bytes().to_vec()
}
fn secret_key_bytes(&self) -> Vec<u8> {
self.inner.secret_key_bytes().to_vec()
}
#[getter]
fn public_key(&self) -> PyPublicKey {
PyPublicKey {
inner: self.inner.public_key(),
}
}
#[staticmethod]
fn from_pem(pem: &str) -> PyResult<Self> {
let inner = RustSigningKey::from_pem(pem).map_err(to_py_err)?;
Ok(Self { inner })
}
fn to_pem(&self) -> String {
self.inner.to_pem()
}
}
fn constraint_to_py(py: Python<'_>, constraint: &Constraint) -> PyResult<PyObject> {
match constraint {
Constraint::Pattern(p) =>
{
#[allow(deprecated)]
Ok(PyPattern { inner: p.clone() }.into_py(py))
}
Constraint::Exact(e) =>
{
#[allow(deprecated)]
Ok(PyExact { inner: e.clone() }.into_py(py))
}
Constraint::OneOf(o) =>
{
#[allow(deprecated)]
Ok(PyOneOf { inner: o.clone() }.into_py(py))
}
Constraint::NotOneOf(n) =>
{
#[allow(deprecated)]
Ok(PyNotOneOf { inner: n.clone() }.into_py(py))
}
Constraint::Unknown { .. } => Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Unknown constraint type encountered (not supported in Python bindings)",
)),
Constraint::Range(r) =>
{
#[allow(deprecated)]
Ok(PyRange { inner: r.clone() }.into_py(py))
}
Constraint::Cidr(c) =>
{
#[allow(deprecated)]
Ok(PyCidr { inner: c.clone() }.into_py(py))
}
Constraint::UrlPattern(u) =>
{
#[allow(deprecated)]
Ok(PyUrlPattern { inner: u.clone() }.into_py(py))
}
Constraint::Contains(c) =>
{
#[allow(deprecated)]
Ok(PyContains { inner: c.clone() }.into_py(py))
}
Constraint::Subset(s) =>
{
#[allow(deprecated)]
Ok(PySubset { inner: s.clone() }.into_py(py))
}
Constraint::All(a) => {
let py_constraints = Python::with_gil(|py| -> PyResult<Vec<PyObject>> {
let mut vec = Vec::new();
for c in &a.constraints {
vec.push(constraint_to_py(py, c)?);
}
Ok(vec)
})?;
#[allow(deprecated)]
Ok(PyAll::new(py_constraints)?.into_py(py))
}
Constraint::Any(a) => {
let py_constraints = Python::with_gil(|py| -> PyResult<Vec<PyObject>> {
let mut vec = Vec::new();
for c in &a.constraints {
vec.push(constraint_to_py(py, c)?);
}
Ok(vec)
})?;
#[allow(deprecated)]
Ok(PyAnyOf::new(py_constraints)?.into_py(py))
}
Constraint::Not(n) => {
let py_constraint = Python::with_gil(|py| -> PyResult<PyObject> {
constraint_to_py(py, &n.constraint)
})?;
#[allow(deprecated)]
Ok(PyNot::new(py_constraint)?.into_py(py))
}
Constraint::Cel(c) =>
{
#[allow(deprecated)]
Ok(PyCel { inner: c.clone() }.into_py(py))
}
Constraint::Wildcard(w) =>
{
#[allow(deprecated)]
Ok(PyWildcard { inner: w.clone() }.into_py(py))
}
Constraint::Regex(r) =>
{
#[allow(deprecated)]
Ok(PyRegex { inner: r.clone() }.into_py(py))
}
Constraint::Subpath(s) =>
{
#[allow(deprecated)]
Ok(PySubpath { inner: s.clone() }.into_py(py))
}
Constraint::UrlSafe(u) =>
{
#[allow(deprecated)]
Ok(PyUrlSafe { inner: u.clone() }.into_py(py))
}
Constraint::Shlex(sh) =>
{
#[allow(deprecated)]
Ok(PyShlex { inner: sh.clone() }.into_py(py))
}
}
}
fn py_to_constraint(obj: &Bound<'_, PyAny>) -> PyResult<Constraint> {
if let Ok(p) = obj.extract::<PyPattern>() {
Ok(Constraint::Pattern(p.inner))
} else if let Ok(e) = obj.extract::<PyExact>() {
Ok(Constraint::Exact(e.inner))
} else if let Ok(o) = obj.extract::<PyOneOf>() {
Ok(Constraint::OneOf(o.inner))
} else if let Ok(n) = obj.extract::<PyNotOneOf>() {
Ok(Constraint::NotOneOf(n.inner))
} else if let Ok(r) = obj.extract::<PyRange>() {
Ok(Constraint::Range(r.inner))
} else if let Ok(c) = obj.extract::<PyCidr>() {
Ok(Constraint::Cidr(c.inner))
} else if let Ok(u) = obj.extract::<PyUrlPattern>() {
Ok(Constraint::UrlPattern(u.inner))
} else if let Ok(c) = obj.extract::<PyContains>() {
Ok(Constraint::Contains(c.inner))
} else if let Ok(s) = obj.extract::<PySubset>() {
Ok(Constraint::Subset(s.inner))
} else if let Ok(a) = obj.extract::<PyAll>() {
Ok(Constraint::All(a.inner))
} else if let Ok(a) = obj.extract::<PyAnyOf>() {
Ok(Constraint::Any(a.inner))
} else if let Ok(n) = obj.extract::<PyNot>() {
Ok(Constraint::Not(n.inner))
} else if let Ok(c) = obj.extract::<PyCel>() {
Ok(Constraint::Cel(c.inner))
} else if let Ok(r) = obj.extract::<PyRegex>() {
Ok(Constraint::Regex(r.inner))
} else if let Ok(w) = obj.extract::<PyWildcard>() {
Ok(Constraint::Wildcard(w.inner))
} else if let Ok(s) = obj.extract::<PySubpath>() {
Ok(Constraint::Subpath(s.inner))
} else if let Ok(u) = obj.extract::<PyUrlSafe>() {
Ok(Constraint::UrlSafe(u.inner))
} else if let Ok(sh) = obj.extract::<PyShlex>() {
Ok(Constraint::Shlex(sh.inner))
} else {
Err(PyValueError::new_err(
"constraint must be Pattern, Exact, OneOf, NotOneOf, Range, Cidr, UrlPattern, Contains, Subset, All, AnyOf, Not, CEL, Regex, Wildcard, Subpath, UrlSafe, or Shlex",
))
}
}
const ALLOW_UNKNOWN_KEY: &str = "_allow_unknown";
fn py_dict_to_constraint_set(
constraints: &Bound<'_, PyDict>,
) -> PyResult<crate::constraints::ConstraintSet> {
let mut constraint_set = crate::constraints::ConstraintSet::new();
for (field_key, constraint_val) in constraints.iter() {
let field: String = field_key.extract()?;
if field == ALLOW_UNKNOWN_KEY {
let allow: bool = constraint_val
.extract()
.map_err(|_| PyValueError::new_err("_allow_unknown must be a boolean"))?;
constraint_set.set_allow_unknown(allow);
continue;
}
let constraint = py_to_constraint(&constraint_val)?;
constraint_set.insert(field, constraint);
}
Ok(constraint_set)
}
fn py_to_constraint_value(obj: &Bound<'_, PyAny>) -> PyResult<ConstraintValue> {
if let Ok(s) = obj.extract::<String>() {
Ok(ConstraintValue::String(s))
} else if let Ok(i) = obj.extract::<i64>() {
Ok(ConstraintValue::Integer(i))
} else if let Ok(f) = obj.extract::<f64>() {
Ok(ConstraintValue::Float(f))
} else if let Ok(b) = obj.extract::<bool>() {
Ok(ConstraintValue::Boolean(b))
} else if let Ok(l) = obj.extract::<Vec<PyObject>>() {
let py = obj.py();
let mut values = Vec::new();
for item in l {
values.push(py_to_constraint_value(&item.into_bound(py))?);
}
Ok(ConstraintValue::List(values))
} else {
Err(PyValueError::new_err(
"value must be str, int, float, bool, or list",
))
}
}
fn constraint_value_to_py(py: Python<'_>, value: &ConstraintValue) -> PyResult<PyObject> {
match value {
ConstraintValue::String(s) => Ok(s.to_object(py)),
ConstraintValue::Integer(i) => Ok(i.to_object(py)),
ConstraintValue::Float(f) => Ok(f.to_object(py)),
ConstraintValue::Boolean(b) => Ok(b.to_object(py)),
ConstraintValue::List(l) => {
let py_list: Vec<PyObject> = l
.iter()
.map(|v| constraint_value_to_py(py, v))
.collect::<PyResult<Vec<_>>>()?;
Ok(py_list.to_object(py))
}
ConstraintValue::Object(o) => {
let dict = pyo3::types::PyDict::new(py);
for (k, v) in o {
dict.set_item(k, constraint_value_to_py(py, v)?)?;
}
Ok(dict.to_object(py))
}
ConstraintValue::Null => Ok(py.None()),
}
}
fn py_to_arg_approval_gate(
arg_name: &str,
av: &Bound<'_, PyAny>,
) -> PyResult<crate::approval_gate::ArgApprovalGate> {
use crate::approval_gate::ArgApprovalGate;
if av.is_none() {
return Ok(ArgApprovalGate::All);
}
if let Ok(d) = av.downcast::<PyDict>() {
if d.len() != 1 {
return Err(pyo3::exceptions::PyValueError::new_err(format!(
"arg gate '{}': approval gate dict must have exactly one key ('exempt')",
arg_name
)));
}
let inner = d.get_item("exempt")?.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err(format!(
"arg gate '{}': approval gate dict must have key 'exempt'",
arg_name
))
})?;
let constraint = py_to_constraint(&inner)?;
return ArgApprovalGate::exempt(constraint).map_err(|e| {
pyo3::exceptions::PyValueError::new_err(format!(
"arg gate '{}': invalid Exempt inner constraint — {}",
arg_name, e
))
});
}
let constraint = py_to_constraint(av)?;
Ok(ArgApprovalGate::Constraint(constraint))
}
fn py_dict_to_approval_gate_map(
dict: &Bound<'_, PyDict>,
) -> PyResult<crate::approval_gate::ApprovalGateMap> {
use crate::approval_gate::{ApprovalGateMap, ToolApprovalGate};
let mut gm = ApprovalGateMap::new();
for (key, value) in dict.iter() {
let tool: String = key.extract()?;
if value.is_none() {
gm.insert(tool, ToolApprovalGate::whole_tool());
} else if let Ok(args_dict) = value.downcast::<PyDict>() {
let mut args = std::collections::BTreeMap::new();
for (ak, av) in args_dict.iter() {
let arg_name: String = ak.extract()?;
let gate = py_to_arg_approval_gate(&arg_name, &av)?;
args.insert(arg_name, gate);
}
gm.insert(tool, ToolApprovalGate::with_args(args));
} else {
return Err(pyo3::exceptions::PyValueError::new_err(format!(
"approval gate for '{}' must be None (whole-tool) or a dict of arg approval gates",
tool
)));
}
}
Ok(gm)
}
#[pyclass(name = "WarrantType")]
#[derive(Clone, Copy)]
pub struct PyWarrantType {
inner: WarrantType,
}
#[pymethods]
impl PyWarrantType {
#[new]
fn new(warrant_type: &str) -> PyResult<Self> {
let inner = match warrant_type.to_lowercase().as_str() {
"execution" => WarrantType::Execution,
"issuer" => WarrantType::Issuer,
_ => {
return Err(PyValueError::new_err(
"WarrantType must be 'execution' or 'issuer'",
))
}
};
Ok(Self { inner })
}
fn __repr__(&self) -> String {
format!("WarrantType.{:?}", self.inner)
}
fn __richcmp__(&self, other: &Self, op: pyo3::basic::CompareOp) -> PyResult<bool> {
match op {
pyo3::basic::CompareOp::Eq => Ok(self.inner == other.inner),
pyo3::basic::CompareOp::Ne => Ok(self.inner != other.inner),
_ => Err(pyo3::exceptions::PyTypeError::new_err(
"Comparison not supported",
)),
}
}
#[classattr]
#[allow(non_snake_case)]
fn Execution() -> Self {
Self {
inner: WarrantType::Execution,
}
}
#[classattr]
#[allow(non_snake_case)]
fn Issuer() -> Self {
Self {
inner: WarrantType::Issuer,
}
}
}
#[pyclass(name = "Clearance")]
#[derive(Clone, Copy)]
pub struct PyClearance {
inner: Clearance,
}
#[pymethods]
#[allow(non_snake_case)]
impl PyClearance {
#[new]
fn new(value: &Bound<'_, PyAny>) -> PyResult<Self> {
if let Ok(s) = value.extract::<String>() {
let inner = s.parse().map_err(|e: String| PyValueError::new_err(e))?;
Ok(Self { inner })
} else if let Ok(n) = value.extract::<u8>() {
Ok(Self {
inner: Clearance(n),
})
} else {
Err(PyValueError::new_err(
"Clearance must be initialized with a string name or integer (0-255)",
))
}
}
#[classattr]
fn UNTRUSTED() -> Self {
Self {
inner: Clearance::UNTRUSTED,
}
}
#[classattr]
fn EXTERNAL() -> Self {
Self {
inner: Clearance::EXTERNAL,
}
}
#[classattr]
fn PARTNER() -> Self {
Self {
inner: Clearance::PARTNER,
}
}
#[classattr]
fn INTERNAL() -> Self {
Self {
inner: Clearance::INTERNAL,
}
}
#[classattr]
fn PRIVILEGED() -> Self {
Self {
inner: Clearance::PRIVILEGED,
}
}
#[classattr]
fn SYSTEM() -> Self {
Self {
inner: Clearance::SYSTEM,
}
}
fn value(&self) -> u8 {
self.inner.level()
}
#[getter]
fn level(&self) -> u8 {
self.inner.level()
}
fn meets(&self, required: &Self) -> bool {
self.inner.meets(required.inner)
}
#[staticmethod]
fn custom(level: u8) -> Self {
Self {
inner: Clearance::custom(level),
}
}
fn __ge__(&self, other: &Self) -> bool {
self.inner >= other.inner
}
fn __le__(&self, other: &Self) -> bool {
self.inner <= other.inner
}
fn __gt__(&self, other: &Self) -> bool {
self.inner > other.inner
}
fn __lt__(&self, other: &Self) -> bool {
self.inner < other.inner
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __repr__(&self) -> String {
format!("{}", self.inner)
}
}
#[pyclass(name = "ChangeType")]
#[derive(Clone)]
pub struct PyChangeType {
inner: RustChangeType,
}
#[pymethods]
#[allow(non_snake_case)]
impl PyChangeType {
#[classattr]
fn UNCHANGED() -> Self {
Self {
inner: RustChangeType::Unchanged,
}
}
#[classattr]
fn ADDED() -> Self {
Self {
inner: RustChangeType::Added,
}
}
#[classattr]
fn REMOVED() -> Self {
Self {
inner: RustChangeType::Removed,
}
}
#[classattr]
fn NARROWED() -> Self {
Self {
inner: RustChangeType::Narrowed,
}
}
#[classattr]
fn REDUCED() -> Self {
Self {
inner: RustChangeType::Reduced,
}
}
#[classattr]
fn INCREASED() -> Self {
Self {
inner: RustChangeType::Increased,
}
}
#[classattr]
fn DEMOTED() -> Self {
Self {
inner: RustChangeType::Demoted,
}
}
#[classattr]
fn PROMOTED() -> Self {
Self {
inner: RustChangeType::Promoted,
}
}
#[classattr]
fn DROPPED() -> Self {
Self {
inner: RustChangeType::Dropped,
}
}
#[getter]
fn value(&self) -> &'static str {
self.inner.as_str()
}
fn __repr__(&self) -> String {
format!("ChangeType.{}", self.inner.as_str().to_uppercase())
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __hash__(&self) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.inner.as_str().hash(&mut hasher);
hasher.finish()
}
}
#[pyclass(name = "ToolsDiff")]
#[derive(Clone)]
pub struct PyToolsDiff {
inner: RustToolsDiff,
}
#[pymethods]
impl PyToolsDiff {
#[getter]
fn parent_tools(&self) -> Vec<String> {
self.inner.parent_tools.clone()
}
#[getter]
fn child_tools(&self) -> Vec<String> {
self.inner.child_tools.clone()
}
#[getter]
fn kept(&self) -> Vec<String> {
self.inner.kept.clone()
}
#[getter]
fn dropped(&self) -> Vec<String> {
self.inner.dropped.clone()
}
fn __repr__(&self) -> String {
format!(
"ToolsDiff(kept={:?}, dropped={:?})",
self.inner.kept, self.inner.dropped
)
}
}
#[pyclass(name = "ConstraintDiff")]
#[derive(Clone)]
pub struct PyConstraintDiff {
inner: RustConstraintDiff,
}
#[pymethods]
impl PyConstraintDiff {
#[getter]
fn field(&self) -> &str {
&self.inner.field
}
#[getter]
fn parent_constraint(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match &self.inner.parent_constraint {
Some(c) => Ok(Some(constraint_to_py(py, c)?)),
None => Ok(None),
}
}
#[getter]
fn child_constraint(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match &self.inner.child_constraint {
Some(c) => Ok(Some(constraint_to_py(py, c)?)),
None => Ok(None),
}
}
#[getter]
fn change(&self) -> PyChangeType {
PyChangeType {
inner: self.inner.change,
}
}
fn __repr__(&self) -> String {
format!(
"ConstraintDiff(field='{}', change={})",
self.inner.field,
self.inner.change.as_str()
)
}
}
#[pyclass(name = "TtlDiff")]
#[derive(Clone)]
pub struct PyTtlDiff {
inner: RustTtlDiff,
}
#[pymethods]
impl PyTtlDiff {
#[getter]
fn parent_ttl_seconds(&self) -> Option<i64> {
self.inner.parent_ttl_seconds
}
#[getter]
fn child_ttl_seconds(&self) -> Option<i64> {
self.inner.child_ttl_seconds
}
#[getter]
fn change(&self) -> PyChangeType {
PyChangeType {
inner: self.inner.change,
}
}
fn __repr__(&self) -> String {
format!(
"TtlDiff(parent={:?}, child={:?}, change={})",
self.inner.parent_ttl_seconds,
self.inner.child_ttl_seconds,
self.inner.change.as_str()
)
}
}
#[pyclass(name = "ClearanceDiff")]
#[derive(Clone)]
pub struct PyClearanceDiff {
inner: RustClearanceDiff,
}
#[pymethods]
impl PyClearanceDiff {
#[getter]
fn parent_clearance(&self) -> Option<PyClearance> {
self.inner
.parent_clearance
.map(|c| PyClearance { inner: c })
}
#[getter]
fn child_clearance(&self) -> Option<PyClearance> {
self.inner.child_clearance.map(|c| PyClearance { inner: c })
}
#[getter]
fn change(&self) -> PyChangeType {
PyChangeType {
inner: self.inner.change,
}
}
fn __repr__(&self) -> String {
format!("ClearanceDiff(change={})", self.inner.change.as_str())
}
}
#[pyclass(name = "DepthDiff")]
#[derive(Clone)]
pub struct PyDepthDiff {
inner: RustDepthDiff,
}
#[pymethods]
impl PyDepthDiff {
#[getter]
fn parent_depth(&self) -> u32 {
self.inner.parent_depth
}
#[getter]
fn child_depth(&self) -> u32 {
self.inner.child_depth
}
#[getter]
fn is_terminal(&self) -> bool {
self.inner.is_terminal
}
fn __repr__(&self) -> String {
format!(
"DepthDiff(parent={}, child={}, terminal={})",
self.inner.parent_depth, self.inner.child_depth, self.inner.is_terminal
)
}
}
#[pyclass(name = "DelegationDiff")]
#[derive(Clone)]
pub struct PyDelegationDiff {
inner: RustDelegationDiff,
}
#[pymethods]
impl PyDelegationDiff {
#[getter]
fn parent_warrant_id(&self) -> &str {
&self.inner.parent_warrant_id
}
#[getter]
fn child_warrant_id(&self) -> Option<&str> {
self.inner.child_warrant_id.as_deref()
}
#[getter]
fn timestamp(&self) -> String {
self.inner.timestamp.to_rfc3339()
}
#[getter]
fn tools(&self) -> PyToolsDiff {
PyToolsDiff {
inner: self.inner.tools.clone(),
}
}
#[getter]
fn capabilities<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
for (tool, tool_diffs) in &self.inner.capabilities {
let tool_dict = PyDict::new(py);
for (field, diff) in tool_diffs {
let py_diff = PyConstraintDiff {
inner: diff.clone(),
};
tool_dict.set_item(field, py_diff.into_pyobject(py)?)?;
}
dict.set_item(tool, tool_dict)?;
}
Ok(dict)
}
#[getter]
fn ttl(&self) -> PyTtlDiff {
PyTtlDiff {
inner: self.inner.ttl.clone(),
}
}
#[getter]
fn clearance(&self) -> PyClearanceDiff {
PyClearanceDiff {
inner: self.inner.clearance.clone(),
}
}
#[getter]
fn depth(&self) -> PyDepthDiff {
PyDepthDiff {
inner: self.inner.depth.clone(),
}
}
#[getter]
fn intent(&self) -> Option<&str> {
self.inner.intent.as_deref()
}
fn to_json(&self) -> PyResult<String> {
self.inner
.to_json()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn to_human(&self) -> String {
self.inner.to_human()
}
fn to_siem_json(&self) -> PyResult<String> {
self.inner
.to_siem_json()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn __repr__(&self) -> String {
format!(
"DelegationDiff(parent='{}', child={:?})",
self.inner.parent_warrant_id, self.inner.child_warrant_id
)
}
}
#[pyclass(name = "DelegationReceipt")]
#[derive(Clone)]
pub struct PyDelegationReceipt {
inner: RustDelegationReceipt,
}
#[pymethods]
impl PyDelegationReceipt {
#[staticmethod]
fn from_diff(
diff: &PyDelegationDiff,
child_warrant_id: &str,
delegator_fingerprint: &str,
delegatee_fingerprint: &str,
) -> Self {
Self {
inner: RustDelegationReceipt::from_diff(
diff.inner.clone(),
child_warrant_id.to_string(),
delegator_fingerprint.to_string(),
delegatee_fingerprint.to_string(),
),
}
}
#[getter]
fn parent_warrant_id(&self) -> &str {
&self.inner.parent_warrant_id
}
#[getter]
fn child_warrant_id(&self) -> &str {
&self.inner.child_warrant_id
}
#[getter]
fn timestamp(&self) -> String {
self.inner.timestamp.to_rfc3339()
}
#[getter]
fn tools(&self) -> PyToolsDiff {
PyToolsDiff {
inner: self.inner.tools.clone(),
}
}
#[getter]
fn capabilities<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
for (tool, tool_diffs) in &self.inner.capabilities {
let tool_dict = PyDict::new(py);
for (field, diff) in tool_diffs {
let py_diff = PyConstraintDiff {
inner: diff.clone(),
};
tool_dict.set_item(field, py_diff.into_pyobject(py)?)?;
}
dict.set_item(tool, tool_dict)?;
}
Ok(dict)
}
#[getter]
fn ttl(&self) -> PyTtlDiff {
PyTtlDiff {
inner: self.inner.ttl.clone(),
}
}
#[getter]
fn clearance(&self) -> PyClearanceDiff {
PyClearanceDiff {
inner: self.inner.clearance.clone(),
}
}
#[getter]
fn depth(&self) -> PyDepthDiff {
PyDepthDiff {
inner: self.inner.depth.clone(),
}
}
#[getter]
fn delegator_fingerprint(&self) -> &str {
&self.inner.delegator_fingerprint
}
#[getter]
fn delegatee_fingerprint(&self) -> &str {
&self.inner.delegatee_fingerprint
}
#[getter]
fn intent(&self) -> Option<&str> {
self.inner.intent.as_deref()
}
#[getter]
fn used_pass_through(&self) -> bool {
self.inner.used_pass_through
}
#[getter]
fn pass_through_reason(&self) -> Option<&str> {
self.inner.pass_through_reason.as_deref()
}
fn to_json(&self) -> PyResult<String> {
self.inner
.to_json()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn to_siem_json(&self) -> PyResult<String> {
self.inner
.to_siem_json()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn __repr__(&self) -> String {
format!(
"DelegationReceipt(parent='{}', child='{}')",
self.inner.parent_warrant_id, self.inner.child_warrant_id
)
}
}
#[pyclass(name = "AttenuationBuilder")]
pub struct PyAttenuationBuilder {
inner: OwnedAttenuationBuilder,
}
#[pymethods]
impl PyAttenuationBuilder {
fn with_capability(&mut self, tool: &str, constraints: &Bound<'_, PyDict>) -> PyResult<()> {
let constraint_set = py_dict_to_constraint_set(constraints)?;
self.inner.set_capability(tool, constraint_set);
Ok(())
}
fn inherit_all(&mut self) {
self.inner.inherit_all();
}
fn with_ttl(&mut self, seconds: u64) {
self.inner.set_ttl(Duration::from_secs(seconds));
}
fn with_holder(&mut self, holder: &PyPublicKey) {
self.inner.set_holder(holder.inner.clone());
}
fn with_clearance(&mut self, level: &PyClearance) {
self.inner.set_clearance(level.inner);
}
fn with_intent(&mut self, intent: &str) {
self.inner.set_intent(intent);
}
fn with_approval_gates(&mut self, approval_gates: &Bound<'_, PyDict>) -> PyResult<()> {
let approval_gate_map = py_dict_to_approval_gate_map(approval_gates)?;
let bytes = crate::approval_gate::encode_approval_gate_map(&approval_gate_map)
.map_err(to_py_err)?;
self.inner
.set_approval_gates_extension(bytes)
.map_err(to_py_err)
}
fn with_tool(&mut self, tool: &str) {
self.inner.retain_capability(tool);
}
fn with_tools(&mut self, tools: Vec<String>) {
self.inner.retain_capabilities(&tools);
}
fn with_issuable_tool(&mut self, tool: &str) {
self.inner.set_issuable_tool(tool);
}
fn with_issuable_tools(&mut self, tools: Vec<String>) {
self.inner.set_issuable_tools(tools);
}
fn drop_tools(&mut self, tools: Vec<String>) {
self.inner.drop_issuable_tools(tools);
}
fn terminal(&mut self) {
self.inner.set_terminal();
}
#[getter]
fn parent(&self) -> PyWarrant {
PyWarrant {
inner: self.inner.parent().clone(),
}
}
#[getter]
fn capabilities<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
for (tool, constraints) in self.inner.tools().iter() {
let constraint_dict = PyDict::new(py);
for (field, constraint) in constraints.iter() {
let py_constraint = constraint_to_py(py, constraint)?;
constraint_dict.set_item(field, py_constraint)?;
}
dict.set_item(tool, constraint_dict)?;
}
Ok(dict)
}
#[getter]
fn ttl_seconds(&self) -> Option<u64> {
self.inner.ttl_seconds()
}
#[getter]
fn holder(&self) -> Option<PyPublicKey> {
self.inner
.get_holder()
.map(|pk| PyPublicKey { inner: pk.clone() })
}
#[getter]
fn clearance(&self) -> Option<PyClearance> {
self.inner.clearance().map(|tl| PyClearance { inner: tl })
}
#[getter]
fn intent(&self) -> Option<String> {
self.inner.intent().map(|s| s.to_string())
}
fn diff(&self) -> String {
self.inner.diff().to_human()
}
fn diff_structured(&self) -> PyDelegationDiff {
PyDelegationDiff {
inner: self.inner.diff(),
}
}
#[pyo3(name = "delegate")]
fn delegate(&self, signing_key: &PySigningKey) -> PyResult<PyWarrant> {
let warrant = self
.inner
.clone()
.build(&signing_key.inner)
.map_err(to_py_err)?;
Ok(PyWarrant { inner: warrant })
}
#[pyo3(name = "delegate_with_receipt")]
fn delegate_with_receipt(
&self,
signing_key: &PySigningKey,
) -> PyResult<(PyWarrant, PyDelegationReceipt)> {
let (warrant, receipt) = self
.inner
.clone()
.build_with_receipt(&signing_key.inner)
.map_err(to_py_err)?;
Ok((
PyWarrant { inner: warrant },
PyDelegationReceipt { inner: receipt },
))
}
fn __repr__(&self) -> String {
format!(
"AttenuationBuilder(parent={}, ttl={:?}, holder={:?})",
self.inner.parent().id(),
self.inner.ttl_seconds(),
self.inner.get_holder().is_some()
)
}
}
#[pyclass(name = "IssuanceBuilder")]
pub struct PyIssuanceBuilder {
inner: OwnedIssuanceBuilder,
}
#[pymethods]
impl PyIssuanceBuilder {
fn with_capability(&mut self, tool: &str, constraints: &Bound<'_, PyDict>) -> PyResult<()> {
let constraint_set = py_dict_to_constraint_set(constraints)?;
self.inner.set_capability(tool, constraint_set);
Ok(())
}
fn with_clearance(&mut self, level: &PyClearance) {
self.inner.set_clearance(level.inner);
}
fn with_ttl(&mut self, seconds: u64) {
self.inner.set_ttl(Duration::from_secs(seconds));
}
fn with_max_depth(&mut self, max_depth: u32) {
self.inner.set_max_depth(max_depth);
}
fn with_session_id(&mut self, session_id: &str) {
self.inner.set_session_id(session_id);
}
fn with_agent_id(&mut self, agent_id: &str) {
self.inner.set_agent_id(agent_id);
}
fn with_holder(&mut self, holder: &PyPublicKey) {
self.inner.set_holder(holder.inner.clone());
}
fn with_required_approvers(&mut self, approvers: &Bound<'_, PyAny>) -> PyResult<()> {
let py_list = approvers.downcast::<pyo3::types::PyList>()?;
let mut rust_approvers = Vec::new();
for item in py_list.iter() {
let pk: PyPublicKey = item.extract()?;
rust_approvers.push(pk.inner);
}
self.inner.set_required_approvers(rust_approvers);
Ok(())
}
fn with_min_approvals(&mut self, min: u32) {
self.inner.set_min_approvals(min);
}
fn with_approval_gates(&mut self, approval_gates: &Bound<'_, PyDict>) -> PyResult<()> {
let approval_gate_map = py_dict_to_approval_gate_map(approval_gates)?;
let bytes = crate::approval_gate::encode_approval_gate_map(&approval_gate_map)
.map_err(to_py_err)?;
self.inner
.set_approval_gates_extension(bytes)
.map_err(to_py_err)
}
fn with_intent(&mut self, intent: &str) {
self.inner.set_intent(intent);
}
fn terminal(&mut self) {
self.inner.set_terminal();
}
fn with_tool(&mut self, tool: &str) {
let empty = crate::constraints::ConstraintSet::new();
self.inner.set_capability(tool, empty);
}
#[getter]
fn issuer(&self) -> PyWarrant {
PyWarrant {
inner: self.inner.issuer().clone(),
}
}
#[getter]
fn tools(&self) -> Option<Vec<String>> {
let caps = self.inner.tools();
if caps.is_empty() {
None
} else {
let mut keys: Vec<String> = caps.keys().cloned().collect();
keys.sort();
Some(keys)
}
}
#[getter]
fn ttl_seconds(&self) -> Option<u64> {
self.inner.ttl_seconds()
}
#[getter]
fn holder(&self) -> Option<PyPublicKey> {
self.inner
.holder()
.map(|pk| PyPublicKey { inner: pk.clone() })
}
#[getter]
fn clearance(&self) -> Option<PyClearance> {
self.inner.clearance().map(|tl| PyClearance { inner: tl })
}
#[getter]
fn intent(&self) -> Option<String> {
self.inner.intent().map(|s| s.to_string())
}
fn build(&self, signing_key: &PySigningKey) -> PyResult<PyWarrant> {
let warrant = self
.inner
.clone()
.build(&signing_key.inner)
.map_err(to_py_err)?;
Ok(PyWarrant { inner: warrant })
}
fn __repr__(&self) -> String {
format!(
"IssuanceBuilder(issuer={}, tools={:?}, holder={:?})",
self.inner.issuer().id(),
self.inner.tools().keys().collect::<Vec<_>>(),
self.inner.get_holder().is_some()
)
}
}
#[pyclass(name = "Warrant", subclass)]
pub struct PyWarrant {
inner: RustWarrant,
}
#[pymethods]
impl PyWarrant {
#[new]
fn new(token: String) -> PyResult<Self> {
let inner = crate::wire::decode_base64(&token).map_err(to_py_err)?;
Ok(Self { inner })
}
fn __str__(&self) -> PyResult<String> {
crate::wire::encode_base64(&self.inner).map_err(to_py_err)
}
#[staticmethod]
#[pyo3(signature = (keypair, capabilities=None, ttl_seconds=3600, holder=None, session_id=None, clearance=None, required_approvers=None, min_approvals=None, approval_gates=None))]
#[allow(clippy::too_many_arguments)]
fn issue(
keypair: &PySigningKey,
capabilities: Option<&Bound<'_, PyDict>>,
ttl_seconds: u64,
holder: Option<&PyPublicKey>,
session_id: Option<&str>,
clearance: Option<&PyClearance>,
required_approvers: Option<Vec<PyPublicKey>>,
min_approvals: Option<u32>,
approval_gates: Option<&Bound<'_, PyDict>>,
) -> PyResult<Self> {
let mut builder = RustWarrant::builder().ttl(Duration::from_secs(ttl_seconds));
if let Some(caps_dict) = capabilities {
for (tool_key, constraints_val) in caps_dict.iter() {
let tool_name: String = tool_key.extract()?;
let constraints_dict: &Bound<'_, PyDict> = constraints_val
.downcast()
.map_err(|_| PyValueError::new_err("capabilities values must be dicts"))?;
let constraint_set = py_dict_to_constraint_set(constraints_dict)?;
builder = builder.capability(tool_name, constraint_set);
}
}
if let Some(tl) = clearance {
builder = builder.clearance(tl.inner);
}
if let Some(h) = holder {
builder = builder.holder(h.inner.clone());
} else {
builder = builder.holder(keypair.inner.public_key());
}
if let Some(sid) = session_id {
builder = builder.session_id(sid);
}
if let Some(approvers) = required_approvers {
let rust_approvers: Vec<crate::crypto::PublicKey> =
approvers.into_iter().map(|pk| pk.inner).collect();
builder = builder.required_approvers(rust_approvers);
}
if let Some(min) = min_approvals {
builder = builder.min_approvals(min);
}
if let Some(approval_gates_dict) = approval_gates {
let approval_gate_map = py_dict_to_approval_gate_map(approval_gates_dict)?;
let bytes = crate::approval_gate::encode_approval_gate_map(&approval_gate_map)
.map_err(to_py_err)?;
builder = builder.extension(
crate::approval_gate::APPROVAL_GATE_EXTENSION_KEY.to_string(),
bytes,
);
}
let warrant = builder.build(&keypair.inner).map_err(to_py_err)?;
Ok(Self { inner: warrant })
}
#[staticmethod]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (issuable_tools, keypair, constraint_bounds=None, max_issue_depth=None, ttl_seconds=3600, holder=None, session_id=None, clearance=None))]
fn issue_issuer(
issuable_tools: Vec<String>,
keypair: &PySigningKey,
constraint_bounds: Option<&Bound<'_, PyDict>>,
max_issue_depth: Option<u32>,
ttl_seconds: u64,
holder: Option<&PyPublicKey>,
session_id: Option<&str>,
clearance: Option<&PyClearance>,
) -> PyResult<Self> {
let mut builder = RustWarrant::builder()
.r#type(WarrantType::Issuer)
.issuable_tools(issuable_tools)
.ttl(Duration::from_secs(ttl_seconds));
if let Some(depth) = max_issue_depth {
builder = builder.max_issue_depth(depth);
}
if let Some(tl) = clearance {
builder = builder.clearance(tl.inner);
}
if let Some(h) = holder {
builder = builder.holder(h.inner.clone());
} else {
builder = builder.holder(keypair.inner.public_key());
}
if let Some(sid) = session_id {
builder = builder.session_id(sid);
}
if let Some(bounds_dict) = constraint_bounds {
for (key, value) in bounds_dict.iter() {
let field: String = key.extract()?;
let constraint = py_to_constraint(&value)?;
builder = builder.constraint_bound(field, constraint);
}
}
let warrant = builder.build(&keypair.inner).map_err(to_py_err)?;
Ok(Self { inner: warrant })
}
fn issue_execution(&self) -> PyResult<PyIssuanceBuilder> {
if self.inner.r#type() != WarrantType::Issuer {
return Err(PyValueError::new_err(
"can only issue execution warrants from issuer warrants",
));
}
Ok(PyIssuanceBuilder {
inner: OwnedIssuanceBuilder::new(self.inner.clone()),
})
}
#[getter]
fn warrant_type(&self) -> PyWarrantType {
let wt = self.inner.r#type();
PyWarrantType { inner: wt }
}
#[getter]
fn id(&self) -> String {
self.inner.id().to_string()
}
#[getter]
fn tools(&self) -> Option<Vec<String>> {
self.inner.capabilities().map(|caps| {
let mut keys: Vec<String> = caps.keys().cloned().collect();
keys.sort();
keys
})
}
#[getter]
fn issuable_tools(&self) -> Option<Vec<String>> {
self.inner.issuable_tools().map(|t| t.to_vec())
}
#[getter]
fn max_issue_depth(&self) -> Option<u32> {
self.inner.max_issue_depth()
}
#[getter]
fn depth(&self) -> u32 {
self.inner.depth()
}
#[getter]
fn max_depth(&self) -> Option<u32> {
self.inner.max_depth()
}
fn ttl_seconds(&self) -> u64 {
let now = chrono::Utc::now().timestamp() as u64;
let expires = self.inner.expires_at().timestamp() as u64;
expires.saturating_sub(now)
}
#[getter]
fn parent_hash(&self) -> Option<String> {
self.inner.parent_hash().map(hex::encode)
}
#[getter]
fn session_id(&self) -> Option<&str> {
self.inner.session_id()
}
#[getter]
fn authorized_holder(&self) -> PyPublicKey {
PyPublicKey {
inner: self.inner.authorized_holder().clone(),
}
}
#[getter]
fn payload_bytes(&self) -> Vec<u8> {
self.inner.payload_bytes().to_vec()
}
#[getter]
fn issuer(&self) -> PyPublicKey {
PyPublicKey {
inner: self.inner.issuer().clone(),
}
}
#[getter]
fn clearance(&self) -> Option<PyClearance> {
self.inner.clearance().map(|tl| PyClearance { inner: tl })
}
fn is_expired(&self) -> bool {
self.inner.is_expired()
}
fn is_terminal(&self) -> bool {
self.inner.is_terminal()
}
fn expires_at(&self) -> String {
self.inner.expires_at().to_rfc3339()
}
#[getter]
fn capabilities<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyDict>>> {
if let Some(caps) = self.inner.capabilities() {
let dict = PyDict::new(py);
for (tool, constraints) in caps.iter() {
let constraint_dict = PyDict::new(py);
for (field, constraint) in constraints.iter() {
let py_constraint = constraint_to_py(py, constraint)?;
constraint_dict.set_item(field, py_constraint)?;
}
dict.set_item(tool, constraint_dict)?;
}
Ok(Some(dict))
} else {
Ok(None)
}
}
fn constraint_bounds_dict<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyDict>>> {
if let Some(bounds) = self.inner.constraint_bounds() {
let dict = PyDict::new(py);
for (field, constraint) in bounds.iter() {
let py_constraint = constraint_to_py(py, constraint)?;
dict.set_item(field, py_constraint)?;
}
Ok(Some(dict))
} else {
Ok(None)
}
}
fn attenuate_builder(&self) -> PyAttenuationBuilder {
PyAttenuationBuilder {
inner: OwnedAttenuationBuilder::new(self.inner.clone()),
}
}
#[pyo3(signature = (capabilities, signing_key, ttl_seconds=None, holder=None, clearance=None))]
fn attenuate(
&self,
capabilities: &Bound<'_, PyDict>,
signing_key: &PySigningKey,
ttl_seconds: Option<u64>,
holder: Option<&PyPublicKey>,
clearance: Option<&PyClearance>,
) -> PyResult<PyWarrant> {
let mut builder = self.inner.attenuate();
if let Some(ttl) = ttl_seconds {
builder = builder.ttl(Duration::from_secs(ttl));
}
if let Some(h) = holder {
builder = builder.holder(h.inner.clone());
}
for (tool_key, constraints_val) in capabilities.iter() {
let tool_name: String = tool_key.extract()?;
let constraints_dict: &Bound<'_, PyDict> = constraints_val
.downcast()
.map_err(|_| PyValueError::new_err("capabilities values must be dicts"))?;
let constraint_set = py_dict_to_constraint_set(constraints_dict)?;
builder = builder.capability(tool_name, constraint_set);
}
if let Some(c) = clearance {
builder = builder.clearance(c.inner);
}
let warrant = builder.build(&signing_key.inner).map_err(to_py_err)?;
Ok(PyWarrant { inner: warrant })
}
fn check_constraints(&self, tool: &str, args: &Bound<'_, PyDict>) -> PyResult<Option<String>> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
match self.inner.check_constraints(tool, &rust_args) {
Ok(()) => Ok(None),
Err(crate::error::Error::ConstraintNotSatisfied { field, reason }) => Ok(Some(
format!("Constraint '{}' not satisfied: {}", field, reason),
)),
Err(e) => Ok(Some(format!("{}", e))),
}
}
fn check_constraints_detailed(
&self,
tool: &str,
args: &Bound<'_, PyDict>,
) -> PyResult<Option<(String, String)>> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
match self.inner.check_constraints(tool, &rust_args) {
Ok(()) => Ok(None),
Err(crate::error::Error::ConstraintNotSatisfied { field, reason }) => {
Ok(Some((field, reason)))
}
Err(e) => Ok(Some(("_error".to_string(), format!("{}", e)))),
}
}
fn agent_id(&self) -> Option<String> {
self.inner.agent_id().map(|s| s.to_string())
}
fn requires_multisig(&self) -> bool {
self.inner.requires_multisig()
}
fn required_approvers(&self) -> Option<Vec<PyPublicKey>> {
self.inner.required_approvers().map(|approvers| {
approvers
.iter()
.map(|pk| PyPublicKey { inner: pk.clone() })
.collect()
})
}
fn min_approvals(&self) -> Option<u32> {
self.inner.min_approvals()
}
fn approval_threshold(&self) -> u32 {
self.inner.approval_threshold()
}
fn extensions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
for (key, value) in self.inner.extensions().iter() {
dict.set_item(key, value.as_slice())?;
}
Ok(dict)
}
fn extension(&self, key: &str) -> Option<Vec<u8>> {
self.inner.extension(key).cloned()
}
fn validate_warrant(&self) -> Vec<String> {
let mut errors = Vec::new();
if let Err(e) = self.inner.validate() {
errors.push(format!("{}", e));
}
if let Err(e) = self.inner.validate_constraint_depth() {
errors.push(format!("{}", e));
}
errors
}
fn verify(&self, public_key_bytes: &[u8]) -> PyResult<bool> {
let arr: [u8; 32] = public_key_bytes
.try_into()
.map_err(|_| PyValueError::new_err("public key must be exactly 32 bytes"))?;
let pk = crate::crypto::PublicKey::from_bytes(&arr).map_err(to_py_err)?;
match self.inner.verify(&pk) {
Ok(()) => Ok(true),
Err(_) => Ok(false),
}
}
fn sign(
&self,
keypair: &PySigningKey,
tool: &str,
args: &Bound<'_, PyDict>,
timestamp: i64,
) -> PyResult<Vec<u8>> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
let sig = self
.inner
.sign_with_timestamp(&keypair.inner, tool, &rust_args, Some(timestamp))
.map_err(to_py_err)?;
Ok(sig.to_bytes().to_vec())
}
fn dedup_key(&self, tool: &str, args: &Bound<'_, PyDict>) -> PyResult<String> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
Ok(self.inner.dedup_key(tool, &rust_args))
}
#[staticmethod]
fn dedup_ttl_secs() -> i64 {
crate::warrant::Warrant::dedup_ttl_secs()
}
fn to_base64(&self) -> PyResult<String> {
wire::encode_base64(&self.inner).map_err(to_py_err)
}
fn to_bytes(&self) -> PyResult<Vec<u8>> {
wire::encode(&self.inner).map_err(to_py_err)
}
fn to_pem(&self) -> PyResult<String> {
wire::encode_pem(&self.inner).map_err(to_py_err)
}
#[staticmethod]
fn from_base64(s: &str) -> PyResult<Self> {
let inner = wire::decode_base64(s).map_err(to_py_err)?;
Ok(Self { inner })
}
#[staticmethod]
fn from_bytes(data: &[u8]) -> PyResult<Self> {
let inner = wire::decode(data).map_err(to_py_err)?;
Ok(Self { inner })
}
fn __repr__(&self) -> String {
format!(
"Warrant(id='{}', type={:?}, tool={}, depth={})",
self.inner.id(),
self.inner.r#type(),
if self.inner.payload.tools.is_empty() {
"None".to_string()
} else {
format!("{:?}", self.inner.payload.tools.keys())
},
self.inner.depth()
)
}
}
#[pyclass(name = "McpConfig")]
pub struct PyMcpConfig {
inner: McpConfig,
}
#[pymethods]
impl PyMcpConfig {
#[staticmethod]
fn from_file(path: &str) -> PyResult<Self> {
let config = McpConfig::from_file(path).map_err(config_err_to_py)?;
Ok(Self { inner: config })
}
}
#[pyclass(name = "CompiledMcpConfig")]
pub struct PyCompiledMcpConfig {
inner: Arc<CompiledMcpConfig>,
}
#[pymethods]
impl PyCompiledMcpConfig {
#[staticmethod]
fn compile(config: &PyMcpConfig) -> PyResult<Self> {
let compiled = CompiledMcpConfig::compile(config.inner.clone()).map_err(to_py_err)?;
Ok(Self {
inner: Arc::new(compiled),
})
}
fn validate(&self) -> Vec<String> {
self.inner.validate()
}
fn extract_constraints(
&self,
tool_name: &str,
arguments: &Bound<'_, PyDict>,
) -> PyResult<PyExtractionResult> {
let py = arguments.py();
let json_str = {
let json_mod = py.import("json")?;
let dumps = json_mod.getattr("dumps")?;
dumps.call1((arguments,))?.extract::<String>()?
};
let args_value: serde_json::Value = serde_json::from_str(&json_str)
.map_err(|e| PyValueError::new_err(format!("Invalid JSON arguments: {}", e)))?;
let result = self
.inner
.extract_constraints(tool_name, &args_value)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
let dict = PyDict::new(py);
for (key, value) in result.constraints {
let py_val = constraint_value_to_py(py, &value)?;
dict.set_item(key, py_val)?;
}
Ok(PyExtractionResult {
constraints: dict.into(),
tool: result.tool,
warrant_base64: result.warrant_base64,
signature_base64: result.signature_base64,
approvals_base64: result.approvals_base64,
})
}
}
#[pyclass(name = "ExtractionResult")]
pub struct PyExtractionResult {
#[pyo3(get)]
constraints: PyObject,
#[pyo3(get)]
tool: String,
#[pyo3(get)]
warrant_base64: Option<String>,
#[pyo3(get)]
signature_base64: Option<String>,
#[pyo3(get)]
approvals_base64: Vec<String>,
}
#[pymethods]
impl PyExtractionResult {
fn __repr__(&self) -> String {
let auth_info = if self.warrant_base64.is_some() {
" +auth"
} else {
" "
};
format!(
"ExtractionResult(tool='{}', constraints={{...}}{})",
self.tool, auth_info
)
}
}
#[pyclass(name = "PublicKey")]
#[derive(Clone)]
pub struct PyPublicKey {
inner: RustPublicKey,
}
#[pymethods]
impl PyPublicKey {
#[staticmethod]
fn from_bytes(bytes: &[u8]) -> PyResult<Self> {
let arr: [u8; 32] = bytes
.try_into()
.map_err(|_| PyValueError::new_err("public key must be exactly 32 bytes"))?;
let inner = RustPublicKey::from_bytes(&arr).map_err(to_py_err)?;
Ok(Self { inner })
}
fn to_bytes(&self) -> Vec<u8> {
self.inner.to_bytes().to_vec()
}
fn __richcmp__(&self, other: &Self, op: pyo3::basic::CompareOp) -> PyResult<bool> {
match op {
pyo3::basic::CompareOp::Eq => Ok(self.inner == other.inner),
pyo3::basic::CompareOp::Ne => Ok(self.inner != other.inner),
_ => Err(pyo3::exceptions::PyTypeError::new_err(
"Comparison not supported",
)),
}
}
fn __repr__(&self) -> String {
let bytes = self.inner.to_bytes();
format!(
"PublicKey({:02x}{:02x}{:02x}{:02x}...)",
bytes[0], bytes[1], bytes[2], bytes[3]
)
}
#[staticmethod]
fn from_pem(pem: &str) -> PyResult<Self> {
let inner = RustPublicKey::from_pem(pem).map_err(to_py_err)?;
Ok(Self { inner })
}
fn to_pem(&self) -> String {
self.inner.to_pem()
}
}
#[pyclass(name = "Signature")]
pub struct PySignature {
inner: RustSignature,
}
#[pymethods]
impl PySignature {
#[staticmethod]
fn from_bytes(bytes: &[u8]) -> PyResult<Self> {
let arr: [u8; 64] = bytes
.try_into()
.map_err(|_| PyValueError::new_err("signature must be exactly 64 bytes"))?;
let inner = RustSignature::from_bytes(&arr).map_err(to_py_err)?;
Ok(Self { inner })
}
fn to_bytes(&self) -> Vec<u8> {
self.inner.to_bytes().to_vec()
}
fn __repr__(&self) -> String {
let bytes = self.inner.to_bytes();
format!("Signature({:02x}{:02x}...)", bytes[0], bytes[1])
}
}
#[pyclass(name = "ChainStep")]
#[derive(Clone)]
pub struct PyChainStep {
inner: RustChainStep,
}
#[pymethods]
impl PyChainStep {
#[getter]
fn warrant_id(&self) -> String {
self.inner.warrant_id.clone()
}
#[getter]
fn depth(&self) -> u32 {
self.inner.depth
}
#[getter]
fn issuer(&self) -> Vec<u8> {
self.inner.issuer.to_vec()
}
fn __repr__(&self) -> String {
format!(
"ChainStep(warrant_id='{}', depth={}, issuer={:02x}...)",
self.inner.warrant_id, self.inner.depth, self.inner.issuer[0]
)
}
}
#[pyclass(name = "ChainVerificationResult")]
#[derive(Clone)]
pub struct PyChainVerificationResult {
inner: RustChainVerificationResult,
}
#[pymethods]
impl PyChainVerificationResult {
#[getter]
fn root_issuer(&self) -> Option<Vec<u8>> {
self.inner.root_issuer.map(|arr| arr.to_vec())
}
#[getter]
fn chain_length(&self) -> usize {
self.inner.chain_length
}
#[getter]
fn leaf_depth(&self) -> u32 {
self.inner.leaf_depth
}
#[getter]
fn verified_steps(&self) -> Vec<PyChainStep> {
self.inner
.verified_steps
.iter()
.map(|step| PyChainStep {
inner: step.clone(),
})
.collect()
}
#[getter]
fn warrant_stack_b64(&self) -> Option<String> {
self.inner.warrant_stack_b64.clone()
}
fn __repr__(&self) -> String {
format!(
"ChainVerificationResult(chain_length={}, leaf_depth={}, steps={})",
self.inner.chain_length,
self.inner.leaf_depth,
self.inner.verified_steps.len()
)
}
}
#[pyclass(name = "ApprovalPayload", module = "tenuo")]
#[derive(Clone)]
pub struct PyApprovalPayload {
inner: RustApprovalPayload,
}
#[pymethods]
impl PyApprovalPayload {
#[new]
#[pyo3(signature = (request_hash, nonce, external_id, approved_at, expires_at, extensions=None))]
fn new(
request_hash: [u8; 32],
nonce: [u8; 16],
external_id: String,
approved_at: u64,
expires_at: u64,
extensions: Option<&Bound<'_, PyDict>>,
) -> PyResult<Self> {
let mut ext_map = None;
if let Some(dict) = extensions {
let mut map = HashMap::new();
for (key, value) in dict.iter() {
let k: String = key.extract()?;
let v: Vec<u8> = value.extract()?;
map.insert(k, v);
}
ext_map = Some(map);
}
Ok(Self {
inner: RustApprovalPayload {
version: 1,
request_hash,
nonce,
external_id,
approved_at,
expires_at,
extensions: ext_map,
},
})
}
#[getter]
fn version(&self) -> u8 {
self.inner.version
}
#[getter]
fn request_hash(&self) -> [u8; 32] {
self.inner.request_hash
}
#[getter]
fn nonce(&self) -> [u8; 16] {
self.inner.nonce
}
#[getter]
fn external_id(&self) -> &str {
&self.inner.external_id
}
#[getter]
fn approved_at(&self) -> u64 {
self.inner.approved_at
}
#[getter]
fn expires_at(&self) -> u64 {
self.inner.expires_at
}
fn __repr__(&self) -> String {
format!(
"ApprovalPayload(external_id='{}', approved_at={}, expires_at={})",
self.inner.external_id, self.inner.approved_at, self.inner.expires_at
)
}
}
#[pyclass(name = "SignedApproval", module = "tenuo")]
#[derive(Clone)]
pub struct PySignedApproval {
inner: RustSignedApproval,
}
#[pymethods]
impl PySignedApproval {
#[new]
fn new(
approval_version: u8,
payload: &[u8],
approver_key: &PyPublicKey,
signature: &[u8],
) -> PyResult<Self> {
if signature.len() != 64 {
return Err(PyValueError::new_err(format!(
"Invalid signature length: expected 64 bytes, got {}",
signature.len()
)));
}
let mut sig_bytes = [0u8; 64];
sig_bytes.copy_from_slice(signature);
let sig = crate::crypto::Signature::from_bytes(&sig_bytes)
.map_err(|e| PyValueError::new_err(format!("Invalid signature bytes: {}", e)))?;
Ok(Self {
inner: RustSignedApproval {
approval_version,
payload: payload.to_vec(),
approver_key: approver_key.inner.clone(),
signature: sig,
},
})
}
#[staticmethod]
fn create(payload: &PyApprovalPayload, keypair: &PySigningKey) -> Self {
Self {
inner: RustSignedApproval::create(payload.inner.clone(), &keypair.inner),
}
}
fn verify(&self) -> PyResult<PyApprovalPayload> {
let payload = self.inner.verify().map_err(to_py_err)?;
Ok(PyApprovalPayload { inner: payload })
}
fn to_bytes(&self) -> PyResult<Vec<u8>> {
let mut buf = Vec::new();
ciborium::into_writer(&self.inner, &mut buf)
.map_err(|e| PyValueError::new_err(format!("Serialization failed: {}", e)))?;
Ok(buf)
}
#[staticmethod]
fn from_bytes(data: &[u8]) -> PyResult<Self> {
let inner: RustSignedApproval = ciborium::from_reader(data)
.map_err(|e| PyValueError::new_err(format!("Deserialization failed: {}", e)))?;
Ok(Self { inner })
}
#[getter]
fn approval_version(&self) -> u8 {
self.inner.approval_version
}
#[getter]
fn approver_key(&self) -> PyPublicKey {
PyPublicKey {
inner: self.inner.approver_key.clone(),
}
}
#[getter]
fn payload(&self) -> &[u8] {
&self.inner.payload
}
#[getter]
fn signature(&self) -> PyResult<Vec<u8>> {
Ok(self.inner.signature.to_bytes().to_vec())
}
fn __repr__(&self) -> String {
format!(
"SignedApproval(version={}, approver={})",
self.inner.approval_version,
hex::encode(&self.inner.approver_key.to_bytes()[..8])
)
}
}
#[pyfunction]
#[pyo3(signature = (warrant_id, tool, args, holder=None))]
fn py_compute_request_hash(
warrant_id: &str,
tool: &str,
args: &Bound<'_, PyDict>,
holder: Option<&PyPublicKey>,
) -> PyResult<[u8; 32]> {
let mut rust_args = std::collections::HashMap::new();
for (key, value) in args.iter() {
let k: String = key.extract()?;
let v = py_to_constraint_value(&value)?;
rust_args.insert(k, v);
}
let holder_ref = holder.map(|h| &h.inner);
Ok(crate::approval::compute_request_hash(
warrant_id, tool, &rust_args, holder_ref,
))
}
#[pyfunction(name = "verify_approvals")]
#[pyo3(signature = (request_hash, approvals, trusted_approvers, threshold=1, clock_tolerance_secs=30))]
fn py_verify_approvals(
request_hash: &[u8],
approvals: Vec<PySignedApproval>,
trusted_approvers: Vec<PyPublicKey>,
threshold: u32,
clock_tolerance_secs: u64,
) -> PyResult<Vec<PyApprovalPayload>> {
if request_hash.len() != 32 {
return Err(PyValueError::new_err(format!(
"request_hash must be 32 bytes, got {}",
request_hash.len()
)));
}
let mut hash_arr = [0u8; 32];
hash_arr.copy_from_slice(request_hash);
if threshold == 0 {
return Err(PyValueError::new_err("threshold must be >= 1"));
}
let max_approvals = trusted_approvers.len().saturating_mul(2);
if approvals.len() > max_approvals {
return Err(PyValueError::new_err(format!(
"too many approvals: {} provided, max {} (2× trusted_approvers)",
approvals.len(),
max_approvals
)));
}
#[derive(Debug)]
enum Rejection {
InvalidSignature,
NotTrusted,
Duplicate,
Expired,
HashMismatch,
}
let now = chrono::Utc::now().timestamp() as u64;
let mut valid_payloads = Vec::new();
let mut seen_approvers = std::collections::HashSet::new();
let mut rejections: Vec<Rejection> = Vec::new();
for approval in &approvals {
let payload = match approval.inner.verify() {
Ok(p) => p,
Err(_) => {
rejections.push(Rejection::InvalidSignature);
continue;
}
};
if !trusted_approvers
.iter()
.any(|k| k.inner == approval.inner.approver_key)
{
rejections.push(Rejection::NotTrusted);
continue;
}
let key_bytes = approval.inner.approver_key.to_bytes();
if seen_approvers.contains(&key_bytes) {
rejections.push(Rejection::Duplicate);
continue;
}
if payload.expires_at + clock_tolerance_secs < now {
rejections.push(Rejection::Expired);
continue;
}
if payload.request_hash != hash_arr {
rejections.push(Rejection::HashMismatch);
continue;
}
seen_approvers.insert(key_bytes);
valid_payloads.push(PyApprovalPayload { inner: payload });
if valid_payloads.len() as u32 >= threshold {
return Ok(valid_payloads);
}
}
if threshold == 1 && approvals.len() == 1 && rejections.len() == 1 {
let reason = match &rejections[0] {
Rejection::InvalidSignature => "invalid signature on approval",
Rejection::NotTrusted => "approver not in trusted set",
Rejection::Duplicate => "duplicate approval from same approver",
Rejection::Expired => "approval expired (beyond clock tolerance)",
Rejection::HashMismatch => {
"request hash mismatch (approval was signed for a different request)"
}
};
return Err(to_py_err(crate::error::Error::InvalidApproval(
reason.to_string(),
)));
}
let mut parts = Vec::new();
let counts: [(usize, &str); 5] = [
(
rejections
.iter()
.filter(|r| matches!(r, Rejection::InvalidSignature))
.count(),
"invalid signature",
),
(
rejections
.iter()
.filter(|r| matches!(r, Rejection::NotTrusted))
.count(),
"untrusted",
),
(
rejections
.iter()
.filter(|r| matches!(r, Rejection::Duplicate))
.count(),
"duplicate",
),
(
rejections
.iter()
.filter(|r| matches!(r, Rejection::Expired))
.count(),
"expired",
),
(
rejections
.iter()
.filter(|r| matches!(r, Rejection::HashMismatch))
.count(),
"hash mismatch",
),
];
for (count, label) in &counts {
if *count > 0 {
parts.push(format!("{count} {label}"));
}
}
let detail = if parts.is_empty() {
String::new()
} else {
format!(" [rejected: {}]", parts.join(", "))
};
Err(to_py_err(crate::error::Error::InsufficientApprovals {
required: threshold,
received: valid_payloads.len() as u32,
detail: if detail.is_empty() {
None
} else {
Some(detail)
},
}))
}
#[pyfunction(name = "evaluate_approval_gates")]
fn py_evaluate_approval_gates(
warrant: &PyWarrant,
tool: &str,
args: &Bound<'_, PyDict>,
) -> PyResult<bool> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
rust_args.insert(field, py_to_constraint_value(&value)?);
}
let approval_gate_map = crate::approval_gate::parse_approval_gate_map(
warrant
.inner
.extension(crate::approval_gate::APPROVAL_GATE_EXTENSION_KEY),
)
.map_err(to_py_err)?;
crate::approval_gate::evaluate_approval_gates(approval_gate_map.as_ref(), tool, &rust_args)
.map_err(to_py_err)
}
#[pyclass(name = "ApprovalMetadata")]
#[derive(Clone)]
pub struct PyApprovalMetadata {
inner: RustApprovalMetadata,
}
#[pymethods]
impl PyApprovalMetadata {
#[new]
#[pyo3(signature = (provider, reason=None))]
fn new(provider: String, reason: Option<String>) -> Self {
Self {
inner: RustApprovalMetadata { provider, reason },
}
}
#[getter]
fn provider(&self) -> &str {
&self.inner.provider
}
#[getter]
fn reason(&self) -> Option<&str> {
self.inner.reason.as_deref()
}
fn __repr__(&self) -> String {
if let Some(ref reason) = self.inner.reason {
format!(
"ApprovalMetadata(provider='{}', reason='{}')",
self.inner.provider, reason
)
} else {
format!("ApprovalMetadata(provider='{}')", self.inner.provider)
}
}
}
#[pyclass(name = "Authorizer")]
pub struct PyAuthorizer {
inner: RustAuthorizer,
}
#[pymethods]
impl PyAuthorizer {
#[new]
#[pyo3(signature = (trusted_roots=None, clock_tolerance_secs=30, pop_window_secs=30, pop_max_windows=5))]
fn new(
trusted_roots: Option<Vec<PyRef<PyPublicKey>>>,
clock_tolerance_secs: i64,
pop_window_secs: i64,
pop_max_windows: u32,
) -> PyResult<Self> {
let mut authorizer = RustAuthorizer::new()
.with_clock_tolerance(chrono::Duration::seconds(clock_tolerance_secs))
.with_pop_window(pop_window_secs, pop_max_windows);
if let Some(roots) = trusted_roots {
for key in roots {
authorizer = authorizer.with_trusted_root(key.inner.clone());
}
}
Ok(Self { inner: authorizer })
}
fn add_trusted_root(&mut self, key: &PyPublicKey) {
self.inner.add_trusted_root(key.inner.clone());
}
fn set_clock_tolerance(&mut self, seconds: i64) {
self.inner
.set_clock_tolerance(chrono::Duration::seconds(seconds));
}
fn set_pop_window(&mut self, window_secs: i64, max_windows: u32) {
self.inner.set_pop_window(window_secs, max_windows);
}
fn require_clearance(&mut self, tool: String, level: &PyClearance) -> PyResult<()> {
self.inner
.require_clearance(&tool, level.inner)
.map_err(to_py_err)
}
fn get_required_clearance(&self, tool: String) -> Option<PyClearance> {
self.inner
.get_required_clearance(&tool)
.map(|tl| PyClearance { inner: tl })
}
fn pop_window_config(&self) -> (i64, u32) {
self.inner.pop_window_config()
}
fn pop_validity_secs(&self) -> i64 {
self.inner.pop_validity_secs()
}
fn trusted_root_count(&self) -> usize {
self.inner.trusted_root_count()
}
#[pyo3(signature = (warrant, tool, args, signature=None, approvals=None))]
fn authorize(
&self,
warrant: &PyWarrant,
tool: &str,
args: &Bound<'_, PyDict>,
signature: Option<&[u8]>,
approvals: Option<Vec<PyRef<PySignedApproval>>>,
) -> PyResult<()> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
let sig = match signature {
Some(bytes) => {
let arr: [u8; 64] = bytes
.try_into()
.map_err(|_| PyValueError::new_err("signature must be exactly 64 bytes"))?;
Some(RustSignature::from_bytes(&arr).map_err(to_py_err)?)
}
None => None,
};
let rust_approvals: Vec<RustSignedApproval> = approvals
.unwrap_or_default()
.iter()
.map(|a| a.inner.clone())
.collect();
self.inner
.authorize_one(
&warrant.inner,
tool,
&rust_args,
sig.as_ref(),
&rust_approvals,
)
.map(|_| ())
.map_err(to_py_err)
}
#[pyo3(signature = (warrant, tool, args, signature=None, approvals=None))]
fn check(
&self,
warrant: &PyWarrant,
tool: &str,
args: &Bound<'_, PyDict>,
signature: Option<&[u8]>,
approvals: Option<Vec<PyRef<PySignedApproval>>>,
) -> PyResult<()> {
self.authorize(warrant, tool, args, signature, approvals)
}
fn verify(&self, warrant: &PyWarrant) -> PyResult<()> {
self.inner
.verify_chain(std::slice::from_ref(&warrant.inner))
.map(|_| ())
.map_err(to_py_err)
}
#[pyo3(signature = (warrant, tool, args, signature=None, approvals=None))]
fn authorize_one(
&self,
warrant: &PyWarrant,
tool: &str,
args: &Bound<'_, PyDict>,
signature: Option<&[u8]>,
approvals: Option<Vec<PyRef<PySignedApproval>>>,
) -> PyResult<PyChainVerificationResult> {
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
let sig = match signature {
Some(bytes) => {
let arr: [u8; 64] = bytes
.try_into()
.map_err(|_| PyValueError::new_err("signature must be exactly 64 bytes"))?;
Some(RustSignature::from_bytes(&arr).map_err(to_py_err)?)
}
None => None,
};
let rust_approvals: Vec<RustSignedApproval> = approvals
.unwrap_or_default()
.iter()
.map(|a| a.inner.clone())
.collect();
let result = self
.inner
.authorize_one(
&warrant.inner,
tool,
&rust_args,
sig.as_ref(),
&rust_approvals,
)
.map_err(to_py_err)?;
Ok(PyChainVerificationResult { inner: result })
}
fn verify_chain(&self, chain: &Bound<'_, PySequence>) -> PyResult<PyChainVerificationResult> {
let len = chain.len()?;
if len == 0 {
return Err(PyValueError::new_err("chain cannot be empty"));
}
let mut warrants = Vec::with_capacity(len);
for i in 0..len {
let item = chain.get_item(i)?;
let warrant_bound = item.downcast::<PyWarrant>()?;
let warrant = warrant_bound.borrow();
warrants.push(warrant.inner.clone());
}
let result = self.inner.verify_chain(&warrants).map_err(to_py_err)?;
Ok(PyChainVerificationResult { inner: result })
}
#[pyo3(signature = (chain, tool, args, signature=None, approvals=None))]
fn check_chain(
&self,
chain: &Bound<'_, PySequence>,
tool: &str,
args: &Bound<'_, PyDict>,
signature: Option<&[u8]>,
approvals: Option<Vec<PyRef<PySignedApproval>>>,
) -> PyResult<PyChainVerificationResult> {
let len = chain.len()?;
if len == 0 {
return Err(PyValueError::new_err("chain cannot be empty"));
}
let mut warrants = Vec::with_capacity(len);
for i in 0..len {
let item = chain.get_item(i)?;
let warrant_bound = item.downcast::<PyWarrant>()?;
let warrant = warrant_bound.borrow();
warrants.push(warrant.inner.clone());
}
let mut rust_args = HashMap::new();
for (key, value) in args.iter() {
let field: String = key.extract()?;
let cv = py_to_constraint_value(&value)?;
rust_args.insert(field, cv);
}
let sig = match signature {
Some(bytes) => {
let arr: [u8; 64] = bytes
.try_into()
.map_err(|_| PyValueError::new_err("signature must be exactly 64 bytes"))?;
Some(RustSignature::from_bytes(&arr).map_err(to_py_err)?)
}
None => None,
};
let rust_approvals: Vec<crate::approval::SignedApproval> = approvals
.unwrap_or_default()
.iter()
.map(|a| a.inner.clone())
.collect();
let result = self
.inner
.check_chain(&warrants, tool, &rust_args, sig.as_ref(), &rust_approvals)
.map_err(to_py_err)?;
Ok(PyChainVerificationResult { inner: result })
}
}
#[pyclass(name = "RevocationRequest")]
#[derive(Clone)]
pub struct PyRevocationRequest {
inner: crate::revocation::RevocationRequest,
}
#[pymethods]
impl PyRevocationRequest {
#[staticmethod]
fn new(warrant_id: &str, reason: &str, requestor_keypair: &PySigningKey) -> PyResult<Self> {
let inner =
crate::revocation::RevocationRequest::new(warrant_id, reason, &requestor_keypair.inner)
.map_err(to_py_err)?;
Ok(Self { inner })
}
fn verify_signature(&self) -> PyResult<()> {
self.inner.verify_signature().map_err(to_py_err)
}
#[getter]
fn warrant_id(&self) -> &str {
&self.inner.warrant_id
}
#[getter]
fn reason(&self) -> &str {
&self.inner.reason
}
#[getter]
fn requestor(&self) -> PyPublicKey {
PyPublicKey {
inner: self.inner.requestor.clone(),
}
}
#[getter]
fn requested_at(&self) -> String {
self.inner.requested_at.to_rfc3339()
}
fn to_bytes(&self) -> PyResult<Vec<u8>> {
self.inner.to_bytes().map_err(to_py_err)
}
#[staticmethod]
fn from_bytes(bytes: &[u8]) -> PyResult<Self> {
let inner = crate::revocation::RevocationRequest::from_bytes(bytes).map_err(to_py_err)?;
Ok(Self { inner })
}
fn __repr__(&self) -> String {
format!(
"RevocationRequest(warrant_id='{}', reason='{}', requestor={})",
self.inner.warrant_id,
self.inner.reason,
hex::encode(self.inner.requestor.to_bytes())
)
}
}
#[pyclass(name = "SignedRevocationList")]
#[derive(Clone)]
pub struct PySignedRevocationList {
inner: crate::revocation::SignedRevocationList,
}
#[pymethods]
impl PySignedRevocationList {
#[staticmethod]
fn builder() -> PySrlBuilder {
PySrlBuilder {
inner: crate::revocation::SignedRevocationList::builder(),
}
}
#[staticmethod]
fn empty(keypair: &PySigningKey) -> PyResult<Self> {
let inner =
crate::revocation::SignedRevocationList::empty(&keypair.inner).map_err(to_py_err)?;
Ok(Self { inner })
}
fn verify(&self, expected_issuer: &PyPublicKey) -> PyResult<()> {
self.inner.verify(&expected_issuer.inner).map_err(to_py_err)
}
fn is_revoked(&self, warrant_id: &str) -> bool {
self.inner.is_revoked(warrant_id)
}
#[getter]
fn version(&self) -> u64 {
self.inner.version()
}
#[getter]
fn issued_at(&self) -> String {
self.inner.issued_at().to_rfc3339()
}
#[getter]
fn issuer(&self) -> PyPublicKey {
PyPublicKey {
inner: self.inner.issuer().clone(),
}
}
#[getter]
fn revoked_ids(&self) -> Vec<String> {
self.inner.revoked_ids().to_vec()
}
fn __len__(&self) -> usize {
self.inner.len()
}
fn to_bytes(&self) -> PyResult<Vec<u8>> {
self.inner.to_bytes().map_err(to_py_err)
}
#[staticmethod]
fn from_bytes(bytes: &[u8]) -> PyResult<Self> {
let inner =
crate::revocation::SignedRevocationList::from_bytes(bytes).map_err(to_py_err)?;
Ok(Self { inner })
}
fn __repr__(&self) -> String {
format!(
"SignedRevocationList(version={}, count={}, issuer={})",
self.inner.version(),
self.inner.len(),
hex::encode(self.inner.issuer().to_bytes())
)
}
}
#[pyclass(name = "SrlBuilder")]
pub struct PySrlBuilder {
inner: crate::revocation::SrlBuilder,
}
#[pymethods]
impl PySrlBuilder {
fn revoke(&mut self, warrant_id: &str) {
self.inner = std::mem::take(&mut self.inner).revoke(warrant_id);
}
fn revoke_all(&mut self, ids: Vec<String>) {
self.inner = std::mem::take(&mut self.inner).revoke_all(ids);
}
fn version(&mut self, version: u64) {
self.inner = std::mem::take(&mut self.inner).version(version);
}
#[allow(clippy::wrong_self_convention)]
fn from_existing(&mut self, existing: &PySignedRevocationList) {
self.inner = std::mem::take(&mut self.inner).from_existing(&existing.inner);
}
fn build(&mut self, keypair: &PySigningKey) -> PyResult<PySignedRevocationList> {
let builder = std::mem::take(&mut self.inner);
let inner = builder.build(&keypair.inner).map_err(to_py_err)?;
Ok(PySignedRevocationList { inner })
}
}
pub fn tenuo_core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyWarrantType>()?;
m.add_class::<PyClearance>()?;
m.add_class::<PyPattern>()?;
m.add_class::<PyExact>()?;
m.add_class::<PyOneOf>()?;
m.add_class::<PyRange>()?;
m.add_class::<PyCidr>()?;
m.add_class::<PyUrlPattern>()?;
m.add_class::<PyRegex>()?;
m.add_class::<PyWildcard>()?;
m.add_class::<PySubpath>()?;
m.add_class::<PyUrlSafe>()?;
m.add_class::<PyShlex>()?;
m.add_class::<PyCel>()?;
m.add_class::<PyNotOneOf>()?;
m.add_class::<PyContains>()?;
m.add_class::<PySubset>()?;
m.add_class::<PyAll>()?;
m.add_class::<PyAnyOf>()?;
m.add_class::<PyNot>()?;
m.add_class::<PySigningKey>()?;
m.add_class::<PyPublicKey>()?;
m.add_class::<PySignature>()?;
m.add_class::<PyWarrant>()?;
m.add_class::<PyAttenuationBuilder>()?;
m.add_class::<PyIssuanceBuilder>()?;
m.add_class::<PyChangeType>()?;
m.add_class::<PyToolsDiff>()?;
m.add_class::<PyConstraintDiff>()?;
m.add_class::<PyTtlDiff>()?;
m.add_class::<PyClearanceDiff>()?;
m.add_class::<PyDepthDiff>()?;
m.add_class::<PyDelegationDiff>()?;
m.add_class::<PyDelegationReceipt>()?;
m.add_class::<PyWarrantType>()?;
m.add_class::<PyMcpConfig>()?;
m.add_class::<PyCompiledMcpConfig>()?;
m.add_class::<PyAuthorizer>()?;
m.add_class::<PyChainStep>()?;
m.add_class::<PyChainVerificationResult>()?;
m.add_class::<PyExtractionResult>()?;
m.add_class::<PyApprovalPayload>()?;
m.add_class::<PySignedApproval>()?;
m.add_class::<PyApprovalMetadata>()?;
m.add_class::<PyRevocationRequest>()?;
m.add_class::<PySignedRevocationList>()?;
m.add_class::<PySrlBuilder>()?;
#[cfg(feature = "python-server")]
m.add_class::<crate::python_control_plane::PyControlPlaneClient>()?;
#[cfg(feature = "python-server")]
m.add_class::<crate::python_control_plane::PyConnectToken>()?;
m.add("MAX_DELEGATION_DEPTH", crate::MAX_DELEGATION_DEPTH)?;
m.add("MAX_WARRANT_SIZE", crate::MAX_WARRANT_SIZE)?;
m.add("MAX_WARRANT_TTL_SECS", crate::MAX_WARRANT_TTL_SECS)?;
m.add("DEFAULT_WARRANT_TTL_SECS", crate::DEFAULT_WARRANT_TTL_SECS)?;
m.add("WIRE_VERSION", crate::WIRE_VERSION)?;
m.add("WARRANT_HEADER", wire::WARRANT_HEADER)?;
m.add_function(wrap_pyfunction!(py_compute_diff, m)?)?;
m.add_function(wrap_pyfunction!(py_decode_warrant_stack_base64, m)?)?;
m.add_function(wrap_pyfunction!(py_encode_warrant_stack, m)?)?;
m.add_function(wrap_pyfunction!(py_compute_request_hash, m)?)?;
m.add_function(wrap_pyfunction!(py_verify_approvals, m)?)?;
m.add_function(wrap_pyfunction!(py_evaluate_approval_gates, m)?)?;
Ok(())
}
#[pyfunction(name = "decode_warrant_stack_base64")]
fn py_decode_warrant_stack_base64(s: &str) -> PyResult<Vec<PyWarrant>> {
use crate::wire;
use base64::Engine;
let stack = wire::decode_pem_chain(s)
.or_else(|err| {
match base64::engine::general_purpose::STANDARD.decode(s.trim()) {
Ok(bytes) => wire::decode_stack(&bytes),
Err(_) => Err(err), }
})
.map_err(to_py_err)?;
Ok(stack
.0
.into_iter()
.map(|w| PyWarrant { inner: w })
.collect())
}
#[pyfunction(name = "encode_warrant_stack")]
fn py_encode_warrant_stack(warrants: Vec<PyRef<PyWarrant>>) -> Option<String> {
if warrants.is_empty() {
return None;
}
use base64::Engine;
let rust_warrants: Vec<crate::warrant::Warrant> =
warrants.iter().map(|w| w.inner.clone()).collect();
let stack = wire::WarrantStack(rust_warrants);
wire::encode_stack(&stack)
.ok()
.map(|b| base64::engine::general_purpose::STANDARD.encode(b))
}
#[pyfunction(name = "compute_diff")]
fn py_compute_diff(parent: &PyWarrant, child: &PyWarrant) -> PyDelegationDiff {
PyDelegationDiff {
inner: crate::diff::compute_diff(&parent.inner, &child.inner),
}
}