use crate::{
agent_key::{AgentKey, AgentMetadata, CertChain},
caveats::{Caveats, CountBound, Scope},
envelope::{Recipient, SignedEnvelope},
fingerprint::Fingerprint,
github_binding::{ssh_pubkey_ed25519_bytes, GitHubBinding},
user_key::{UserKey, UserPublic},
MeshError,
};
use ed25519_dalek::Signature;
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyModule, PyString, PyType};
use ssh_key::PrivateKey as SshPrivateKey;
use std::path::PathBuf;
create_exception!(_agent_mesh, PyMeshError, PyException);
fn mesh_err_to_py(e: MeshError) -> PyErr {
PyMeshError::new_err(e.to_string())
}
#[pyclass(
name = "Fingerprint",
module = "agent_mesh._agent_mesh.core",
frozen,
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyFingerprint {
pub inner: Fingerprint,
}
#[pymethods]
impl PyFingerprint {
#[classmethod]
fn from_hex(_cls: &Bound<'_, PyType>, s: &str) -> PyResult<Self> {
let inner: Fingerprint = s.parse().map_err(mesh_err_to_py)?;
Ok(Self { inner })
}
#[classmethod]
fn from_bytes(_cls: &Bound<'_, PyType>, data: &[u8]) -> PyResult<Self> {
if data.len() != 32 {
return Err(PyMeshError::new_err(format!(
"expected 32 bytes, got {}",
data.len()
)));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(data);
Ok(Self {
inner: Fingerprint(arr),
})
}
#[classmethod]
fn of_bytes(_cls: &Bound<'_, PyType>, data: &[u8]) -> Self {
Self {
inner: Fingerprint::of_bytes(data),
}
}
fn hex(&self) -> String {
self.inner.hex()
}
fn short(&self) -> String {
self.inner.short()
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __hash__(&self) -> u64 {
let mut buf = [0u8; 8];
buf.copy_from_slice(&self.inner.0[..8]);
u64::from_le_bytes(buf)
}
fn __str__(&self) -> String {
self.inner.short()
}
fn __repr__(&self) -> String {
format!("Fingerprint('{}')", self.inner.hex())
}
}
#[pyclass(
name = "UserPublic",
module = "agent_mesh._agent_mesh.core",
frozen,
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyUserPublic {
pub inner: UserPublic,
}
#[pymethods]
impl PyUserPublic {
fn fingerprint(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.fingerprint(),
}
}
fn verify(&self, message: &[u8], signature: &[u8]) -> PyResult<()> {
if signature.len() != 64 {
return Err(PyMeshError::new_err(format!(
"expected 64-byte signature, got {}",
signature.len()
)));
}
let mut arr = [0u8; 64];
arr.copy_from_slice(signature);
let sig = Signature::from_bytes(&arr);
self.inner.verify(message, &sig).map_err(mesh_err_to_py)
}
fn as_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.as_bytes())
}
}
#[pyclass(name = "UserKey", module = "agent_mesh._agent_mesh.core")]
pub struct PyUserKey {
pub inner: UserKey,
}
#[pymethods]
impl PyUserKey {
#[classmethod]
fn generate(_cls: &Bound<'_, PyType>) -> Self {
Self {
inner: UserKey::generate(),
}
}
#[classmethod]
fn load(_cls: &Bound<'_, PyType>, path: PathBuf) -> PyResult<Self> {
let inner = UserKey::load(&path).map_err(mesh_err_to_py)?;
Ok(Self { inner })
}
fn fingerprint(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.fingerprint(),
}
}
fn public(&self) -> PyUserPublic {
PyUserPublic {
inner: self.inner.public(),
}
}
fn sign<'py>(&self, py: Python<'py>, message: &[u8]) -> Bound<'py, PyBytes> {
let sig = self.inner.sign(message);
PyBytes::new(py, &sig.to_bytes())
}
fn save(&self, path: PathBuf) -> PyResult<()> {
self.inner.save(&path).map_err(mesh_err_to_py)
}
}
fn scope_from_opt<T: Ord + Clone>(opt: Option<Vec<T>>) -> Scope<T> {
match opt {
None => Scope::All,
Some(v) => Scope::only(v),
}
}
fn opt_from_scope<T: Ord + Clone>(scope: &Scope<T>) -> Option<Vec<T>> {
match scope {
Scope::All => None,
Scope::Only(set) => Some(set.iter().cloned().collect()),
}
}
#[pyclass(
name = "Caveats",
module = "agent_mesh._agent_mesh.core",
frozen,
from_py_object
)]
#[derive(Clone)]
pub struct PyCaveats {
pub inner: Caveats,
}
#[pymethods]
impl PyCaveats {
#[new]
#[pyo3(signature = (
fs_read = None,
fs_write = None,
exec = None,
net = None,
max_calls = None,
valid_for_generation = None,
))]
fn new(
fs_read: Option<Vec<String>>,
fs_write: Option<Vec<String>>,
exec: Option<Vec<String>>,
net: Option<Vec<String>>,
max_calls: Option<u64>,
valid_for_generation: Option<Vec<u64>>,
) -> Self {
Self {
inner: Caveats {
fs_read: scope_from_opt(fs_read),
fs_write: scope_from_opt(fs_write),
exec: scope_from_opt(exec),
net: scope_from_opt(net),
max_calls: match max_calls {
None => CountBound::Unlimited,
Some(n) => CountBound::AtMost(n),
},
valid_for_generation: scope_from_opt(valid_for_generation),
},
}
}
#[classmethod]
fn top(_cls: &Bound<'_, PyType>) -> Self {
Self {
inner: Caveats::top(),
}
}
#[getter]
fn fs_read(&self) -> Option<Vec<String>> {
opt_from_scope(&self.inner.fs_read)
}
#[getter]
fn fs_write(&self) -> Option<Vec<String>> {
opt_from_scope(&self.inner.fs_write)
}
#[getter]
fn exec(&self) -> Option<Vec<String>> {
opt_from_scope(&self.inner.exec)
}
#[getter]
fn net(&self) -> Option<Vec<String>> {
opt_from_scope(&self.inner.net)
}
#[getter]
fn valid_for_generation(&self) -> Option<Vec<u64>> {
opt_from_scope(&self.inner.valid_for_generation)
}
#[getter]
fn max_calls(&self) -> Option<u64> {
match self.inner.max_calls {
CountBound::Unlimited => None,
CountBound::AtMost(n) => Some(n),
}
}
fn leq(&self, other: &Self) -> bool {
self.inner.leq(&other.inner)
}
fn meet(&self, other: &Self) -> Self {
Self {
inner: self.inner.meet(&other.inner),
}
}
fn __eq__(&self, other: &Self) -> bool {
self.inner == other.inner
}
fn __repr__(&self) -> String {
format!("Caveats({:?})", self.inner)
}
fn to_json<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let s = serde_json::to_string(&self.inner)
.map_err(|e| PyMeshError::new_err(format!("encode caveats: {e}")))?;
let json_mod = py.import("json")?;
json_mod.getattr("loads")?.call1((PyString::new(py, &s),))
}
#[classmethod]
fn from_json(_cls: &Bound<'_, PyType>, data: &Bound<'_, PyAny>) -> PyResult<Self> {
let py = data.py();
let json_str: String = if let Ok(s) = data.cast::<PyString>() {
s.to_str()?.to_string()
} else {
let json_mod = py.import("json")?;
let dumped = json_mod.getattr("dumps")?.call1((data,))?;
dumped.extract::<String>()?
};
let inner: Caveats = serde_json::from_str(&json_str)
.map_err(|e| PyMeshError::new_err(format!("decode caveats: {e}")))?;
Ok(Self { inner })
}
}
#[pyclass(
name = "AgentMetadata",
module = "agent_mesh._agent_mesh.core",
frozen,
from_py_object
)]
#[derive(Clone)]
pub struct PyAgentMetadata {
pub inner: AgentMetadata,
}
#[pymethods]
impl PyAgentMetadata {
#[new]
#[pyo3(signature = (role, host, capabilities, issued_at, expires_at = None, caveats = None))]
fn new(
role: String,
host: String,
capabilities: Vec<String>,
issued_at: String,
expires_at: Option<String>,
caveats: Option<PyCaveats>,
) -> Self {
Self {
inner: AgentMetadata {
role,
host,
capabilities,
issued_at,
expires_at,
caveats: caveats.map_or_else(Caveats::top, |c| c.inner),
},
}
}
#[getter]
fn role(&self) -> &str {
&self.inner.role
}
#[getter]
fn host(&self) -> &str {
&self.inner.host
}
#[getter]
fn capabilities(&self) -> Vec<String> {
self.inner.capabilities.clone()
}
#[getter]
fn issued_at(&self) -> &str {
&self.inner.issued_at
}
#[getter]
fn expires_at(&self) -> Option<String> {
self.inner.expires_at.clone()
}
#[getter]
fn caveats(&self) -> PyCaveats {
PyCaveats {
inner: self.inner.caveats.clone(),
}
}
}
#[pyclass(
name = "CertChain",
module = "agent_mesh._agent_mesh.core",
frozen,
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyCertChain {
pub inner: CertChain,
}
#[pymethods]
impl PyCertChain {
fn verify(&self) -> PyResult<()> {
self.inner.verify().map_err(mesh_err_to_py)
}
fn agent_fingerprint(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.agent_fingerprint(),
}
}
fn user_fingerprint(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.user_fingerprint(),
}
}
}
#[pyclass(name = "AgentKey", module = "agent_mesh._agent_mesh.core")]
pub struct PyAgentKey {
pub inner: AgentKey,
}
#[pymethods]
impl PyAgentKey {
#[classmethod]
fn issue(_cls: &Bound<'_, PyType>, user: &PyUserKey, metadata: PyAgentMetadata) -> Self {
Self {
inner: AgentKey::issue(&user.inner, metadata.inner),
}
}
fn fingerprint(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.fingerprint(),
}
}
fn cert(&self) -> PyCertChain {
PyCertChain {
inner: self.inner.cert().clone(),
}
}
fn sign<'py>(&self, py: Python<'py>, message: &[u8]) -> Bound<'py, PyBytes> {
let sig = self.inner.sign(message);
PyBytes::new(py, &sig.to_bytes())
}
fn public_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.public_bytes())
}
}
#[pyclass(
name = "Recipient",
module = "agent_mesh._agent_mesh.core",
frozen,
from_py_object
)]
#[derive(Clone)]
pub struct PyRecipient {
pub inner: Recipient,
}
#[pymethods]
impl PyRecipient {
#[classmethod]
fn direct(_cls: &Bound<'_, PyType>, agent_fp: &PyFingerprint) -> Self {
Self {
inner: Recipient::Direct {
agent_fp: agent_fp.inner,
},
}
}
#[classmethod]
fn topic(_cls: &Bound<'_, PyType>, name: String) -> Self {
Self {
inner: Recipient::Topic { name },
}
}
#[classmethod]
fn anycast(_cls: &Bound<'_, PyType>, capability: String) -> Self {
Self {
inner: Recipient::Anycast { capability },
}
}
fn __repr__(&self) -> String {
match &self.inner {
Recipient::Direct { agent_fp } => format!("Recipient.direct({})", agent_fp.short()),
Recipient::Topic { name } => format!("Recipient.topic('{name}')"),
Recipient::Anycast { capability } => format!("Recipient.anycast('{capability}')"),
}
}
}
#[pyclass(
name = "SignedEnvelope",
module = "agent_mesh._agent_mesh.core",
frozen,
skip_from_py_object
)]
#[derive(Clone)]
pub struct PySignedEnvelope {
pub inner: SignedEnvelope,
}
#[pymethods]
impl PySignedEnvelope {
#[new]
fn new(sender: &PyAgentKey, recipient: PyRecipient, sequence: u64, payload: Vec<u8>) -> Self {
Self {
inner: SignedEnvelope::new(&sender.inner, recipient.inner, sequence, payload),
}
}
fn verify(&self) -> PyResult<()> {
self.inner.verify().map_err(mesh_err_to_py)
}
fn sender_agent_fp(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.sender_agent_fp(),
}
}
fn sender_user_fp(&self) -> PyFingerprint {
PyFingerprint {
inner: self.inner.sender_user_fp(),
}
}
#[getter]
fn sequence(&self) -> u64 {
self.inner.sequence
}
fn payload<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, self.inner.payload.as_ref())
}
}
#[pyclass(
name = "GitHubBinding",
module = "agent_mesh._agent_mesh.core",
frozen,
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyGitHubBinding {
pub inner: GitHubBinding,
}
#[pymethods]
impl PyGitHubBinding {
#[classmethod]
#[pyo3(signature = (user_public, ssh_private_openssh, github_username = None))]
fn sign(
_cls: &Bound<'_, PyType>,
user_public: &PyUserPublic,
ssh_private_openssh: &[u8],
github_username: Option<String>,
) -> PyResult<Self> {
let ssh_key = SshPrivateKey::from_openssh(ssh_private_openssh)
.map_err(|e| PyMeshError::new_err(format!("parse ssh key: {e}")))?;
let inner = GitHubBinding::sign(&user_public.inner, &ssh_key, github_username)
.map_err(mesh_err_to_py)?;
Ok(Self { inner })
}
fn verify(&self, candidate_ssh_pubkey: &[u8]) -> PyResult<()> {
if candidate_ssh_pubkey.len() != 32 {
return Err(PyMeshError::new_err(format!(
"expected 32-byte ssh pubkey, got {}",
candidate_ssh_pubkey.len()
)));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(candidate_ssh_pubkey);
self.inner.verify(&arr).map_err(mesh_err_to_py)
}
fn user_public(&self) -> PyUserPublic {
PyUserPublic {
inner: self.inner.user_pubkey.clone(),
}
}
fn ssh_pubkey_hex(&self) -> String {
hex_encode(&self.inner.ssh_pubkey)
}
#[getter]
fn github_username(&self) -> Option<String> {
self.inner.github_username.clone()
}
fn to_json<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let s = serde_json::to_string(&self.inner)
.map_err(|e| PyMeshError::new_err(format!("encode binding: {e}")))?;
let json_mod = py.import("json")?;
json_mod.getattr("loads")?.call1((PyString::new(py, &s),))
}
#[classmethod]
fn from_json(_cls: &Bound<'_, PyType>, data: &Bound<'_, PyAny>) -> PyResult<Self> {
let py = data.py();
let json_str: String = if let Ok(s) = data.cast::<PyString>() {
s.to_str()?.to_string()
} else {
let json_mod = py.import("json")?;
let dumped = json_mod.getattr("dumps")?.call1((data,))?;
dumped.extract::<String>()?
};
let inner: GitHubBinding = serde_json::from_str(&json_str)
.map_err(|e| PyMeshError::new_err(format!("decode binding: {e}")))?;
Ok(Self { inner })
}
fn try_verify_ssh_line(&self, line: &str) -> bool {
let Ok(pub_key) = ssh_key::PublicKey::from_openssh(line.trim()) else {
return false;
};
let Ok(bytes) = ssh_pubkey_ed25519_bytes(&pub_key) else {
return false;
};
self.inner.verify(&bytes).is_ok()
}
}
fn hex_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
out.push_str(&format!("{b:02x}"));
}
out
}
#[pyfunction]
fn ssh_authorized_key_to_ed25519_bytes(line: &str) -> PyResult<Vec<u8>> {
let pub_key = ssh_key::PublicKey::from_openssh(line.trim())
.map_err(|e| PyMeshError::new_err(format!("parse openssh pubkey: {e}")))?;
let bytes = ssh_pubkey_ed25519_bytes(&pub_key).map_err(mesh_err_to_py)?;
Ok(bytes.to_vec())
}
pub fn register(py: Python<'_>, parent: &Bound<'_, PyModule>) -> PyResult<()> {
let m = PyModule::new(py, "core")?;
m.add_class::<PyFingerprint>()?;
m.add_class::<PyUserKey>()?;
m.add_class::<PyUserPublic>()?;
m.add_class::<PyCaveats>()?;
m.add_class::<PyAgentMetadata>()?;
m.add_class::<PyAgentKey>()?;
m.add_class::<PyCertChain>()?;
m.add_class::<PyRecipient>()?;
m.add_class::<PySignedEnvelope>()?;
m.add_class::<PyGitHubBinding>()?;
m.add_function(wrap_pyfunction!(ssh_authorized_key_to_ed25519_bytes, &m)?)?;
m.add("MeshError", py.get_type::<PyMeshError>())?;
parent.add_submodule(&m)?;
Ok(())
}