1use pyo3::exceptions::{PyPermissionError, PyValueError};
4use pyo3::prelude::*;
5use typesec_core::policy::{PolicyEngine, PolicyResult};
6
7#[derive(Clone, Copy)]
8enum PolicyFormat {
9 Rbac,
10 Odrl,
11 Graph,
12}
13
14impl PolicyFormat {
15 fn detect(explicit: Option<&str>, yaml: &str) -> PyResult<Self> {
16 match explicit {
17 Some("rbac") => Ok(Self::Rbac),
18 Some("odrl") => Ok(Self::Odrl),
19 Some("graph") => Ok(Self::Graph),
20 Some(other) => Err(PyValueError::new_err(format!(
21 "unsupported policy format '{other}'; use rbac, odrl, or graph"
22 ))),
23 None if yaml.contains("graph_policy:") => Ok(Self::Graph),
24 None if yaml.contains("roles:") => Ok(Self::Rbac),
25 None if yaml.contains("policies:") => Ok(Self::Odrl),
26 None => Err(PyValueError::new_err(
27 "could not detect policy format; pass format='rbac', 'odrl', or 'graph'",
28 )),
29 }
30 }
31}
32
33#[pyclass(frozen)]
34#[derive(Clone)]
35struct Decision {
36 #[pyo3(get)]
37 allowed: bool,
38 #[pyo3(get)]
39 subject: String,
40 #[pyo3(get)]
41 action: String,
42 #[pyo3(get)]
43 resource: String,
44 #[pyo3(get)]
45 reason: Option<String>,
46}
47
48#[pymethods]
49impl Decision {
50 fn __repr__(&self) -> String {
51 let verdict = if self.allowed { "ALLOW" } else { "DENY" };
52 match &self.reason {
53 Some(reason) => format!(
54 "Decision({verdict}, subject={:?}, action={:?}, resource={:?}, reason={:?})",
55 self.subject, self.action, self.resource, reason
56 ),
57 None => format!(
58 "Decision({verdict}, subject={:?}, action={:?}, resource={:?})",
59 self.subject, self.action, self.resource
60 ),
61 }
62 }
63}
64
65#[pyclass]
66struct TypesecGate {
67 yaml: String,
68 format: PolicyFormat,
69}
70
71#[pymethods]
72impl TypesecGate {
73 #[new]
74 #[pyo3(signature = (policy_yaml, format = None))]
75 fn new(policy_yaml: String, format: Option<&str>) -> PyResult<Self> {
76 let format = PolicyFormat::detect(format, &policy_yaml)?;
77 validate_policy(&policy_yaml, format)?;
78 Ok(Self {
79 yaml: policy_yaml,
80 format,
81 })
82 }
83
84 #[staticmethod]
85 #[pyo3(signature = (path, format = None))]
86 fn from_file(path: &str, format: Option<&str>) -> PyResult<Self> {
87 let yaml = std::fs::read_to_string(path)
88 .map_err(|err| PyValueError::new_err(format!("failed to read policy: {err}")))?;
89 Self::new(yaml, format)
90 }
91
92 #[pyo3(signature = (subject, action, resource, purpose = None))]
93 fn check(
94 &self,
95 subject: &str,
96 action: &str,
97 resource: &str,
98 purpose: Option<&str>,
99 ) -> PyResult<Decision> {
100 check_policy(&self.yaml, self.format, subject, action, resource, purpose)
101 }
102
103 #[pyo3(signature = (subject, action, resource, purpose = None))]
104 fn require(
105 &self,
106 subject: &str,
107 action: &str,
108 resource: &str,
109 purpose: Option<&str>,
110 ) -> PyResult<Decision> {
111 let decision = self.check(subject, action, resource, purpose)?;
112 if decision.allowed {
113 Ok(decision)
114 } else {
115 let reason = decision
116 .reason
117 .clone()
118 .unwrap_or_else(|| "access denied".to_string());
119 Err(PyPermissionError::new_err(reason))
120 }
121 }
122}
123
124#[pyfunction]
125#[pyo3(signature = (policy_yaml, subject, action, resource, format = None, purpose = None))]
126fn check(
127 policy_yaml: &str,
128 subject: &str,
129 action: &str,
130 resource: &str,
131 format: Option<&str>,
132 purpose: Option<&str>,
133) -> PyResult<Decision> {
134 let format = PolicyFormat::detect(format, policy_yaml)?;
135 check_policy(policy_yaml, format, subject, action, resource, purpose)
136}
137
138#[pyfunction]
139#[pyo3(signature = (policy_yaml, format = None))]
140fn validate(policy_yaml: &str, format: Option<&str>) -> PyResult<()> {
141 let format = PolicyFormat::detect(format, policy_yaml)?;
142 validate_policy(policy_yaml, format)
143}
144
145fn validate_policy(yaml: &str, format: PolicyFormat) -> PyResult<()> {
146 match format {
147 PolicyFormat::Rbac => {
148 let policy = typesec_rbac::RbacPolicy::from_yaml(yaml)
149 .map_err(|err| PyValueError::new_err(format!("RBAC YAML parse error: {err}")))?;
150 policy.validate().map_err(PyValueError::new_err)
151 }
152 PolicyFormat::Odrl => {
153 typesec_odrl::model::OdrlDocument::from_yaml(yaml)
154 .map_err(|err| PyValueError::new_err(format!("ODRL YAML parse error: {err}")))?;
155 Ok(())
156 }
157 PolicyFormat::Graph => {
158 let doc = typesec_rbac::graph_policy::GraphPolicyDocument::from_yaml(yaml).map_err(
159 |err| PyValueError::new_err(format!("graph policy YAML parse error: {err}")),
160 )?;
161 doc.validate().map_err(PyValueError::new_err)
162 }
163 }
164}
165
166fn check_policy(
167 yaml: &str,
168 format: PolicyFormat,
169 subject: &str,
170 action: &str,
171 resource: &str,
172 purpose: Option<&str>,
173) -> PyResult<Decision> {
174 let result = match format {
175 PolicyFormat::Rbac => {
176 let engine = typesec_rbac::RbacEngine::from_yaml(yaml)
177 .map_err(|err| PyValueError::new_err(format!("RBAC YAML parse error: {err}")))?;
178 engine.check(subject, action, resource)
179 }
180 PolicyFormat::Odrl => {
181 let base = typesec_odrl::OdrlEngine::from_yaml(yaml)
182 .map_err(|err| PyValueError::new_err(format!("ODRL YAML parse error: {err}")))?;
183 let engine = if let Some(purpose) = purpose {
184 let ctx = typesec_odrl::constraint::ConstraintContext::default()
185 .with_purpose(purpose.to_string());
186 base.with_context(ctx)
187 } else {
188 base
189 };
190 engine.check(subject, action, resource)
191 }
192 PolicyFormat::Graph => {
193 let engine =
194 typesec_rbac::GraphPolicyEngine::from_yaml(yaml).map_err(PyValueError::new_err)?;
195 engine.check(subject, action, resource)
196 }
197 };
198
199 Ok(match result {
200 PolicyResult::Allow => Decision {
201 allowed: true,
202 subject: subject.to_string(),
203 action: action.to_string(),
204 resource: resource.to_string(),
205 reason: None,
206 },
207 PolicyResult::Deny(reason) => Decision {
208 allowed: false,
209 subject: subject.to_string(),
210 action: action.to_string(),
211 resource: resource.to_string(),
212 reason: Some(reason),
213 },
214 PolicyResult::Delegate(to) => Decision {
215 allowed: false,
216 subject: subject.to_string(),
217 action: action.to_string(),
218 resource: resource.to_string(),
219 reason: Some(format!("policy delegated to {to}")),
220 },
221 })
222}
223
224#[pymodule]
225fn typesec_native(m: &Bound<'_, PyModule>) -> PyResult<()> {
226 m.add_class::<Decision>()?;
227 m.add_class::<TypesecGate>()?;
228 m.add_function(wrap_pyfunction!(check, m)?)?;
229 m.add_function(wrap_pyfunction!(validate, m)?)?;
230 Ok(())
231}