Skip to main content

typesec_native/
lib.rs

1//! Rust-backed Python bindings for Typesec policy decisions.
2
3use pyo3::exceptions::{PyPermissionError, PyValueError};
4use pyo3::prelude::*;
5use typesec_core::policy::{PolicyEngine, PolicyResult};
6
7#[derive(Clone, Copy)]
8enum PolicyFormat {
9    Rbac,
10    Odrl,
11    Graph,
12}
13
14impl PolicyFormat {
15    fn detect(explicit: Option<&str>, yaml: &str) -> PyResult<Self> {
16        match explicit {
17            Some("rbac") => Ok(Self::Rbac),
18            Some("odrl") => Ok(Self::Odrl),
19            Some("graph") => Ok(Self::Graph),
20            Some(other) => Err(PyValueError::new_err(format!(
21                "unsupported policy format '{other}'; use rbac, odrl, or graph"
22            ))),
23            None if yaml.contains("graph_policy:") => Ok(Self::Graph),
24            None if yaml.contains("roles:") => Ok(Self::Rbac),
25            None if yaml.contains("policies:") => Ok(Self::Odrl),
26            None => Err(PyValueError::new_err(
27                "could not detect policy format; pass format='rbac', 'odrl', or 'graph'",
28            )),
29        }
30    }
31}
32
33#[pyclass(frozen)]
34#[derive(Clone)]
35struct Decision {
36    #[pyo3(get)]
37    allowed: bool,
38    #[pyo3(get)]
39    subject: String,
40    #[pyo3(get)]
41    action: String,
42    #[pyo3(get)]
43    resource: String,
44    #[pyo3(get)]
45    reason: Option<String>,
46}
47
48#[pymethods]
49impl Decision {
50    fn __repr__(&self) -> String {
51        let verdict = if self.allowed { "ALLOW" } else { "DENY" };
52        match &self.reason {
53            Some(reason) => format!(
54                "Decision({verdict}, subject={:?}, action={:?}, resource={:?}, reason={:?})",
55                self.subject, self.action, self.resource, reason
56            ),
57            None => format!(
58                "Decision({verdict}, subject={:?}, action={:?}, resource={:?})",
59                self.subject, self.action, self.resource
60            ),
61        }
62    }
63}
64
65#[pyclass]
66struct TypesecGate {
67    yaml: String,
68    format: PolicyFormat,
69}
70
71#[pymethods]
72impl TypesecGate {
73    #[new]
74    #[pyo3(signature = (policy_yaml, format = None))]
75    fn new(policy_yaml: String, format: Option<&str>) -> PyResult<Self> {
76        let format = PolicyFormat::detect(format, &policy_yaml)?;
77        validate_policy(&policy_yaml, format)?;
78        Ok(Self {
79            yaml: policy_yaml,
80            format,
81        })
82    }
83
84    #[staticmethod]
85    #[pyo3(signature = (path, format = None))]
86    fn from_file(path: &str, format: Option<&str>) -> PyResult<Self> {
87        let yaml = std::fs::read_to_string(path)
88            .map_err(|err| PyValueError::new_err(format!("failed to read policy: {err}")))?;
89        Self::new(yaml, format)
90    }
91
92    #[pyo3(signature = (subject, action, resource, purpose = None))]
93    fn check(
94        &self,
95        subject: &str,
96        action: &str,
97        resource: &str,
98        purpose: Option<&str>,
99    ) -> PyResult<Decision> {
100        check_policy(&self.yaml, self.format, subject, action, resource, purpose)
101    }
102
103    #[pyo3(signature = (subject, action, resource, purpose = None))]
104    fn require(
105        &self,
106        subject: &str,
107        action: &str,
108        resource: &str,
109        purpose: Option<&str>,
110    ) -> PyResult<Decision> {
111        let decision = self.check(subject, action, resource, purpose)?;
112        if decision.allowed {
113            Ok(decision)
114        } else {
115            let reason = decision
116                .reason
117                .clone()
118                .unwrap_or_else(|| "access denied".to_string());
119            Err(PyPermissionError::new_err(reason))
120        }
121    }
122}
123
124#[pyfunction]
125#[pyo3(signature = (policy_yaml, subject, action, resource, format = None, purpose = None))]
126fn check(
127    policy_yaml: &str,
128    subject: &str,
129    action: &str,
130    resource: &str,
131    format: Option<&str>,
132    purpose: Option<&str>,
133) -> PyResult<Decision> {
134    let format = PolicyFormat::detect(format, policy_yaml)?;
135    check_policy(policy_yaml, format, subject, action, resource, purpose)
136}
137
138#[pyfunction]
139#[pyo3(signature = (policy_yaml, format = None))]
140fn validate(policy_yaml: &str, format: Option<&str>) -> PyResult<()> {
141    let format = PolicyFormat::detect(format, policy_yaml)?;
142    validate_policy(policy_yaml, format)
143}
144
145fn validate_policy(yaml: &str, format: PolicyFormat) -> PyResult<()> {
146    match format {
147        PolicyFormat::Rbac => {
148            let policy = typesec_rbac::RbacPolicy::from_yaml(yaml)
149                .map_err(|err| PyValueError::new_err(format!("RBAC YAML parse error: {err}")))?;
150            policy.validate().map_err(PyValueError::new_err)
151        }
152        PolicyFormat::Odrl => {
153            typesec_odrl::model::OdrlDocument::from_yaml(yaml)
154                .map_err(|err| PyValueError::new_err(format!("ODRL YAML parse error: {err}")))?;
155            Ok(())
156        }
157        PolicyFormat::Graph => {
158            let doc = typesec_rbac::graph_policy::GraphPolicyDocument::from_yaml(yaml).map_err(
159                |err| PyValueError::new_err(format!("graph policy YAML parse error: {err}")),
160            )?;
161            doc.validate().map_err(PyValueError::new_err)
162        }
163    }
164}
165
166fn check_policy(
167    yaml: &str,
168    format: PolicyFormat,
169    subject: &str,
170    action: &str,
171    resource: &str,
172    purpose: Option<&str>,
173) -> PyResult<Decision> {
174    let result = match format {
175        PolicyFormat::Rbac => {
176            let engine = typesec_rbac::RbacEngine::from_yaml(yaml)
177                .map_err(|err| PyValueError::new_err(format!("RBAC YAML parse error: {err}")))?;
178            engine.check(subject, action, resource)
179        }
180        PolicyFormat::Odrl => {
181            let base = typesec_odrl::OdrlEngine::from_yaml(yaml)
182                .map_err(|err| PyValueError::new_err(format!("ODRL YAML parse error: {err}")))?;
183            let engine = if let Some(purpose) = purpose {
184                let ctx = typesec_odrl::constraint::ConstraintContext::default()
185                    .with_purpose(purpose.to_string());
186                base.with_context(ctx)
187            } else {
188                base
189            };
190            engine.check(subject, action, resource)
191        }
192        PolicyFormat::Graph => {
193            let engine =
194                typesec_rbac::GraphPolicyEngine::from_yaml(yaml).map_err(PyValueError::new_err)?;
195            engine.check(subject, action, resource)
196        }
197    };
198
199    Ok(match result {
200        PolicyResult::Allow => Decision {
201            allowed: true,
202            subject: subject.to_string(),
203            action: action.to_string(),
204            resource: resource.to_string(),
205            reason: None,
206        },
207        PolicyResult::Deny(reason) => Decision {
208            allowed: false,
209            subject: subject.to_string(),
210            action: action.to_string(),
211            resource: resource.to_string(),
212            reason: Some(reason),
213        },
214        PolicyResult::Delegate(to) => Decision {
215            allowed: false,
216            subject: subject.to_string(),
217            action: action.to_string(),
218            resource: resource.to_string(),
219            reason: Some(format!("policy delegated to {to}")),
220        },
221    })
222}
223
224#[pymodule]
225fn typesec_native(m: &Bound<'_, PyModule>) -> PyResult<()> {
226    m.add_class::<Decision>()?;
227    m.add_class::<TypesecGate>()?;
228    m.add_function(wrap_pyfunction!(check, m)?)?;
229    m.add_function(wrap_pyfunction!(validate, m)?)?;
230    Ok(())
231}