use crate::client::AxonFlowClient;
use crate::error::AxonFlowError;
use crate::types::pep::{
DecideRequest, DecideResponse, MCPCheckInputRequest, MCPCheckInputResponse, Obligation,
};
pub const OBLIGATION_REDACT_PII: &str = "redact_pii";
pub const PHASE_REQUEST: &str = "request";
pub const PHASE_RESPONSE: &str = "response";
pub const CONTENT_TYPE_TEXT: &str = "text/plain";
pub const VERDICT_ALLOW: &str = "allow";
pub const VERDICT_DENY: &str = "deny";
pub const VERDICT_NEEDS_APPROVAL: &str = "needs_approval";
pub const DECIDE_PATH: &str = "/api/v1/decide";
pub const REQUEST_REDACTION_PATH: &str = "/api/v1/mcp/check-input";
pub const RESPONSE_REDACTION_PATH: &str = "/api/v1/mcp/check-output";
pub const GATEWAY_CONNECTOR_TAG: &str = "gateway";
pub fn has_request_redaction(obligations: &[Obligation]) -> bool {
obligations.iter().any(|o| {
o.r#type == OBLIGATION_REDACT_PII
&& o.fulfillment
.as_ref()
.is_some_and(|f| f.phase == PHASE_REQUEST)
})
}
pub(crate) fn endpoint_path_matches(endpoint: &str, expected: &str) -> bool {
let e = endpoint.trim();
if e == expected {
return true;
}
if let Some(idx) = e.find("://") {
let rest = &e[idx + 3..];
if let Some(slash) = rest.find('/') {
let mut path = &rest[slash..];
if let Some(q) = path.find('?') {
path = &path[..q];
}
return path == expected;
}
}
false
}
impl AxonFlowClient {
pub async fn decide(&self, request: DecideRequest) -> Result<DecideResponse, AxonFlowError> {
let url = format!("{}{}", self.endpoint(), DECIDE_PATH);
let resp = self.checked_post_json(&url, &request).await?;
let body = resp.text().await?;
let parsed: DecideResponse = serde_json::from_str(&body)?;
Ok(parsed)
}
pub async fn fulfill_request(
&self,
decision: &DecideResponse,
statement: &str,
) -> Result<(String, bool), AxonFlowError> {
let mut redacted = statement.to_string();
let mut did_redact = false;
for ob in &decision.obligations {
if ob.r#type != OBLIGATION_REDACT_PII {
continue;
}
let fulfillment = match &ob.fulfillment {
Some(f) if f.phase == PHASE_REQUEST => f,
_ => {
return Err(AxonFlowError::ObligationNotFulfillable(
"redact_pii obligation missing request-phase fulfillment".to_string(),
));
}
};
if let Some(cts) = &fulfillment.content_types {
if !cts.is_empty() && !cts.iter().any(|c| c == CONTENT_TYPE_TEXT) {
return Err(AxonFlowError::ObligationNotFulfillable(format!(
"fulfillment endpoint does not advertise a {CONTENT_TYPE_TEXT} detector"
)));
}
}
if !endpoint_path_matches(&fulfillment.endpoint, REQUEST_REDACTION_PATH) {
return Err(AxonFlowError::ObligationNotFulfillable(format!(
"fulfillment endpoint {:?} is not the request-redaction endpoint",
fulfillment.endpoint
)));
}
redacted = self.fulfill_via_check_input(&redacted).await?;
if redacted != statement {
did_redact = true;
}
}
Ok((redacted, did_redact))
}
async fn fulfill_via_check_input(&self, statement: &str) -> Result<String, AxonFlowError> {
let req = MCPCheckInputRequest {
connector_type: GATEWAY_CONNECTOR_TAG.to_string(),
statement: statement.to_string(),
operation: Some("execute".to_string()),
tenant_id: None,
content_type: Some(CONTENT_TYPE_TEXT.to_string()),
};
let url = format!("{}{}", self.endpoint(), REQUEST_REDACTION_PATH);
let result: MCPCheckInputResponse = match self.checked_post_json(&url, &req).await {
Ok(resp) => {
let body = resp.text().await?;
serde_json::from_str(&body).map_err(|e| {
AxonFlowError::ObligationNotFulfillable(format!(
"decode request-redaction engine response: {e}"
))
})?
}
Err(e) => {
return Err(AxonFlowError::ObligationNotFulfillable(format!(
"request-redaction engine call failed: {e}"
)));
}
};
if !result.redaction_evaluated {
return Err(AxonFlowError::ObligationNotFulfillable(
"engine reported the redactor did not run (redaction disabled)".to_string(),
));
}
match (result.redacted, result.redacted_statement) {
(true, Some(masked)) if !masked.is_empty() => Ok(masked),
(true, _) => Err(AxonFlowError::ObligationNotFulfillable(
"engine reported redacted=true but returned no redacted_statement".to_string(),
)),
(false, _) => Ok(statement.to_string()),
}
}
pub async fn decide_and_fulfill(
&self,
request: DecideRequest,
) -> Result<(String, String, DecideResponse), AxonFlowError> {
let query = request.query.clone();
let decision = self.decide(request).await?;
if decision.verdict != VERDICT_ALLOW {
return Ok((decision.verdict.clone(), query, decision));
}
let (redacted, _) = self.fulfill_request(&decision, &query).await?;
Ok((decision.verdict.clone(), redacted, decision))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::pep::ObligationFulfillment;
use crate::{AxonFlowConfig, AxonFlowError};
use serde_json::json;
use std::time::Duration;
use wiremock::matchers::{body_partial_json, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn make_client(endpoint: String) -> AxonFlowClient {
let config = AxonFlowConfig {
endpoint,
client_id: Some("org-1".into()),
client_secret: Some("license-1".into()),
timeout: Duration::from_secs(2),
..Default::default()
};
AxonFlowClient::new(config).expect("client init")
}
fn redact_obligation() -> Obligation {
Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: Some(ObligationFulfillment {
endpoint: REQUEST_REDACTION_PATH.into(),
method: "POST".into(),
phase: PHASE_REQUEST.into(),
content_types: Some(vec![CONTENT_TYPE_TEXT.into()]),
}),
}
}
fn allow_with(obligations: Vec<Obligation>) -> DecideResponse {
DecideResponse {
verdict: VERDICT_ALLOW.into(),
obligations,
..Default::default()
}
}
#[test]
fn endpoint_path_matches_exact_and_absolute() {
assert!(endpoint_path_matches(
REQUEST_REDACTION_PATH,
REQUEST_REDACTION_PATH
));
assert!(endpoint_path_matches(
" /api/v1/mcp/check-input ",
REQUEST_REDACTION_PATH
));
assert!(endpoint_path_matches(
"https://pdp.internal:8443/api/v1/mcp/check-input",
REQUEST_REDACTION_PATH
));
assert!(endpoint_path_matches(
"https://pdp.internal/api/v1/mcp/check-input?x=1",
REQUEST_REDACTION_PATH
));
}
#[test]
fn endpoint_path_matches_rejects_foreign() {
assert!(!endpoint_path_matches("", REQUEST_REDACTION_PATH));
assert!(!endpoint_path_matches(
"/api/v1/mcp/check-output",
REQUEST_REDACTION_PATH
));
assert!(!endpoint_path_matches(
"https://evil.example.com/steal",
REQUEST_REDACTION_PATH
));
assert!(!endpoint_path_matches(
"https://pdp.internal",
REQUEST_REDACTION_PATH
));
}
#[test]
fn has_request_redaction_detects_request_phase() {
assert!(has_request_redaction(&[redact_obligation()]));
}
#[test]
fn has_request_redaction_ignores_response_phase_and_no_fulfillment() {
let resp_phase = Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: Some(ObligationFulfillment {
endpoint: RESPONSE_REDACTION_PATH.into(),
method: "POST".into(),
phase: PHASE_RESPONSE.into(),
content_types: None,
}),
};
let no_fulfillment = Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: None,
};
let other_type = Obligation {
r#type: "log_only".into(),
detail: None,
fulfillment: Some(ObligationFulfillment {
endpoint: REQUEST_REDACTION_PATH.into(),
method: "POST".into(),
phase: PHASE_REQUEST.into(),
content_types: None,
}),
};
assert!(!has_request_redaction(&[
resp_phase,
no_fulfillment,
other_type
]));
assert!(!has_request_redaction(&[]));
}
#[tokio::test]
async fn decide_parses_allow_with_obligation() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"verdict": "allow",
"decision_id": "dec-1",
"trace_id": "04110a0b50577bbbdda23a00dcbaf6da",
"obligations": [{
"type": "redact_pii",
"fulfillment": {
"endpoint": "/api/v1/mcp/check-input",
"method": "POST",
"phase": "request",
"content_types": ["text/plain"],
},
}],
"evaluated_policies": ["sys_pii_email"],
"stage": "tool",
"expires_at": "2026-06-09T05:05:06.8Z",
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let d = client
.decide(DecideRequest::new("tool", "send to a@b.com"))
.await
.unwrap();
assert_eq!(d.verdict, "allow");
assert_eq!(d.decision_id.as_deref(), Some("dec-1"));
assert_eq!(d.obligations.len(), 1);
assert_eq!(d.obligations[0].r#type, "redact_pii");
assert!(has_request_redaction(&d.obligations));
assert_eq!(d.evaluated_policies, vec!["sys_pii_email"]);
}
#[tokio::test]
async fn decide_returns_deny_in_body_not_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"verdict": "deny",
"error": "stage is required and must be one of: llm, tool, agent",
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let d = client
.decide(DecideRequest::new("", "x"))
.await
.expect("deny is a 200 body, not an error");
assert_eq!(d.verdict, "deny");
assert!(d.error.is_some());
assert!(d.obligations.is_empty());
}
#[tokio::test]
async fn decide_maps_401_to_api_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.respond_with(ResponseTemplate::new(401).set_body_string("unauthorized"))
.mount(&server)
.await;
let client = make_client(server.uri());
let err = client
.decide(DecideRequest::new("tool", "x"))
.await
.unwrap_err();
match err {
AxonFlowError::ApiError { status, .. } => assert_eq!(status, 401),
other => panic!("expected ApiError 401, got {other:?}"),
}
}
#[tokio::test]
async fn decide_sends_basic_auth_and_body() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.and(body_partial_json(json!({"stage": "tool", "query": "hi"})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"verdict": "allow"})))
.expect(1)
.mount(&server)
.await;
let client = make_client(server.uri());
let d = client
.decide(DecideRequest::new("tool", "hi"))
.await
.unwrap();
assert_eq!(d.verdict, "allow");
}
#[tokio::test]
async fn fulfill_request_returns_engine_masked_content() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.and(body_partial_json(
json!({"connector_type": "gateway", "content_type": "text/plain"}),
))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"allowed": true,
"redacted": true,
"redacted_statement": "Email jo****om and card 4****1",
"redaction_evaluated": true,
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let (content, did_redact) = client
.fulfill_request(
&allow_with(vec![redact_obligation()]),
"Email john and card 4111",
)
.await
.unwrap();
assert!(did_redact);
assert_eq!(content, "Email jo****om and card 4****1");
}
#[tokio::test]
async fn fulfill_request_no_obligation_is_passthrough() {
let client = make_client("http://127.0.0.1:1".into());
let (content, did_redact) = client
.fulfill_request(&allow_with(vec![]), "untouched")
.await
.unwrap();
assert!(!did_redact);
assert_eq!(content, "untouched");
}
#[tokio::test]
async fn fulfill_request_engine_found_nothing_is_passthrough() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"allowed": true,
"redacted": false,
"redaction_evaluated": true,
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let (content, did_redact) = client
.fulfill_request(&allow_with(vec![redact_obligation()]), "no pii here")
.await
.unwrap();
assert!(!did_redact);
assert_eq!(content, "no pii here");
}
#[tokio::test]
async fn fulfill_fails_closed_on_missing_request_phase_fulfillment() {
let ob = Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: None,
};
let client = make_client("http://127.0.0.1:1".into());
let err = client
.fulfill_request(&allow_with(vec![ob]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_on_response_phase_obligation() {
let ob = Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: Some(ObligationFulfillment {
endpoint: REQUEST_REDACTION_PATH.into(),
method: "POST".into(),
phase: PHASE_RESPONSE.into(),
content_types: None,
}),
};
let client = make_client("http://127.0.0.1:1".into());
let err = client
.fulfill_request(&allow_with(vec![ob]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_on_unadvertised_content_type() {
let ob = Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: Some(ObligationFulfillment {
endpoint: REQUEST_REDACTION_PATH.into(),
method: "POST".into(),
phase: PHASE_REQUEST.into(),
content_types: Some(vec!["image/png".into()]),
}),
};
let client = make_client("http://127.0.0.1:1".into());
let err = client
.fulfill_request(&allow_with(vec![ob]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_on_foreign_endpoint() {
let ob = Obligation {
r#type: OBLIGATION_REDACT_PII.into(),
detail: None,
fulfillment: Some(ObligationFulfillment {
endpoint: "https://evil.example.com/steal".into(),
method: "POST".into(),
phase: PHASE_REQUEST.into(),
content_types: Some(vec![CONTENT_TYPE_TEXT.into()]),
}),
};
let client = make_client("http://127.0.0.1:1".into());
let err = client
.fulfill_request(&allow_with(vec![ob]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_on_engine_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.respond_with(ResponseTemplate::new(500).set_body_string("boom"))
.mount(&server)
.await;
let client = make_client(server.uri());
let err = client
.fulfill_request(&allow_with(vec![redact_obligation()]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_when_redaction_evaluated_false() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"allowed": true,
"redacted": false,
"redaction_evaluated": false,
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let err = client
.fulfill_request(&allow_with(vec![redact_obligation()]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_when_redacted_true_without_statement() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"allowed": true,
"redacted": true,
"redaction_evaluated": true,
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let err = client
.fulfill_request(&allow_with(vec![redact_obligation()]), "secret a@b.com")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_fails_closed_when_redaction_evaluated_absent() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"allowed": true,
"redacted": true,
"redacted_statement": "Email jo****om",
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let err = client
.fulfill_request(&allow_with(vec![redact_obligation()]), "Email john")
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
#[tokio::test]
async fn fulfill_ignores_non_redact_obligation_types() {
let ob = Obligation {
r#type: "audit_only".into(),
detail: None,
fulfillment: None,
};
let client = make_client("http://127.0.0.1:1".into());
let (content, did_redact) = client
.fulfill_request(&allow_with(vec![ob]), "left alone")
.await
.unwrap();
assert!(!did_redact);
assert_eq!(content, "left alone");
}
#[tokio::test]
async fn decide_and_fulfill_allow_redacts() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"verdict": "allow",
"obligations": [{
"type": "redact_pii",
"fulfillment": {
"endpoint": "/api/v1/mcp/check-input",
"phase": "request",
"content_types": ["text/plain"],
},
}],
})))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/api/v1/mcp/check-input"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"allowed": true,
"redacted": true,
"redacted_statement": "card 4****1",
"redaction_evaluated": true,
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let (verdict, content, decision) = client
.decide_and_fulfill(DecideRequest::new("tool", "card 4111111111111111"))
.await
.unwrap();
assert_eq!(verdict, "allow");
assert_eq!(content, "card 4****1");
assert_eq!(decision.verdict, "allow");
assert!(!content.contains("4111111111111111"));
}
#[tokio::test]
async fn decide_and_fulfill_deny_returns_original_without_engine_call() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"verdict": "deny",
"reasons": ["blocked by policy"],
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let (verdict, content, _) = client
.decide_and_fulfill(DecideRequest::new("tool", "original query"))
.await
.unwrap();
assert_eq!(verdict, "deny");
assert_eq!(content, "original query");
}
#[tokio::test]
async fn decide_and_fulfill_unfulfillable_surfaces_error_not_original() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/decide"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"verdict": "allow",
"obligations": [{
"type": "redact_pii",
"fulfillment": {
"endpoint": "https://evil.example.com/steal",
"phase": "request",
},
}],
})))
.mount(&server)
.await;
let client = make_client(server.uri());
let err = client
.decide_and_fulfill(DecideRequest::new("tool", "leak me a@b.com"))
.await
.unwrap_err();
assert!(matches!(err, AxonFlowError::ObligationNotFulfillable(_)));
}
}