use crate::error::Result;
use crate::schema::{ScanResult, TrustBoundary};
pub fn analyze(result: &ScanResult) -> Result<Vec<TrustBoundary>> {
let mut boundaries = Vec::new();
for agent in &result.agents {
let boundary = analyze_agent(agent, result);
boundaries.push(boundary);
}
for tool in &result.tools {
let boundary = analyze_tool(tool);
boundaries.push(boundary);
}
Ok(boundaries)
}
fn analyze_agent(agent: &crate::schema::Agent, _result: &ScanResult) -> TrustBoundary {
let has_untrusted_input = check_untrusted_input_agent(agent);
let has_sensitive_access = check_sensitive_access_agent(agent);
let has_external_actions = check_external_actions_agent(agent);
let compliant = !(has_untrusted_input && has_sensitive_access && has_external_actions);
let mut violations = Vec::new();
if !compliant {
violations.push(format!(
"Agent '{}' violates Rule of Two: has untrusted input, sensitive access, and external actions",
agent.name
));
}
TrustBoundary {
id: format!("boundary_{}", agent.id),
component_id: agent.id.clone(),
component_type: "agent".to_string(),
has_untrusted_input,
has_sensitive_access,
has_external_actions,
compliant,
violations,
location: agent.location.clone(),
}
}
fn analyze_tool(tool: &crate::schema::Tool) -> TrustBoundary {
let has_untrusted_input = check_untrusted_input_tool(tool);
let has_sensitive_access = check_sensitive_access_tool(tool);
let has_external_actions = check_external_actions_tool(tool);
let compliant = !(has_untrusted_input && has_sensitive_access && has_external_actions);
let mut violations = Vec::new();
if !compliant {
violations.push(format!(
"Tool '{}' violates Rule of Two: has untrusted input, sensitive access, and external actions",
tool.name
));
}
TrustBoundary {
id: format!("boundary_{}", tool.id),
component_id: tool.id.clone(),
component_type: "tool".to_string(),
has_untrusted_input,
has_sensitive_access,
has_external_actions,
compliant,
violations,
location: tool.location.clone(),
}
}
fn check_untrusted_input_agent(agent: &crate::schema::Agent) -> bool {
if let Ok(source_code) = std::fs::read_to_string(&agent.location.file) {
has_untrusted_input_patterns(&source_code)
} else {
false
}
}
fn check_sensitive_access_agent(agent: &crate::schema::Agent) -> bool {
if let Ok(source_code) = std::fs::read_to_string(&agent.location.file) {
has_sensitive_access_patterns(&source_code)
} else {
false
}
}
fn check_external_actions_agent(agent: &crate::schema::Agent) -> bool {
if let Ok(source_code) = std::fs::read_to_string(&agent.location.file) {
has_external_action_patterns(&source_code)
} else {
false
}
}
fn check_untrusted_input_tool(tool: &crate::schema::Tool) -> bool {
tool.data_flows
.iter()
.any(|flow| flow.taint_level == "high")
}
fn check_sensitive_access_tool(tool: &crate::schema::Tool) -> bool {
if let Ok(source_code) = std::fs::read_to_string(&tool.location.file) {
has_sensitive_access_patterns(&source_code)
} else {
false
}
}
fn check_external_actions_tool(tool: &crate::schema::Tool) -> bool {
if let Ok(source_code) = std::fs::read_to_string(&tool.location.file) {
has_external_action_patterns(&source_code)
} else {
false
}
}
fn has_untrusted_input_patterns(code: &str) -> bool {
let user_input_patterns = [
"input(", "request.form", "request.json", "request.args", "request.files", "request.get(", "request.POST", "sys.argv", "@app.route", "@api_view", "fastapi.Request", ];
user_input_patterns
.iter()
.any(|pattern| code.contains(pattern))
}
fn has_sensitive_access_patterns(code: &str) -> bool {
let db_patterns = [
"db.query(",
"cursor.execute(",
".execute(", "sqlite3.connect(",
"psycopg2.connect(",
"pymongo.MongoClient(",
"redis.Redis(",
"Session.query(", "from sqlalchemy",
];
let secret_patterns = [
"os.environ[",
"os.getenv(",
"getenv(",
"AWS_ACCESS_KEY",
"API_KEY",
"PASSWORD",
"SECRET",
"TOKEN",
"credentials",
"boto3.client(", "secrets.get(",
];
let file_patterns = [
"open(", "Path(", "/etc/", "/var/", "~/.ssh/", "~/.aws/", ".read(", ".write(",
];
db_patterns.iter().any(|p| code.contains(p))
|| secret_patterns.iter().any(|p| code.contains(p))
|| file_patterns.iter().any(|p| code.contains(p))
}
fn has_external_action_patterns(code: &str) -> bool {
let http_patterns = [
"requests.get(",
"requests.post(",
"requests.put(",
"requests.delete(",
"httpx.get(",
"httpx.post(",
"urllib.request",
"http.client",
"aiohttp.ClientSession(",
];
let process_patterns = [
"subprocess.run(",
"subprocess.call(",
"subprocess.Popen(",
"os.system(",
"os.popen(",
"os.spawn",
"commands.getoutput(",
];
let write_patterns = [
".write(",
".write_text(",
".write_bytes(",
"with open(", "Path(", ];
http_patterns.iter().any(|p| code.contains(p))
|| process_patterns.iter().any(|p| code.contains(p))
|| write_patterns.iter().any(|p| code.contains(p))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{Agent, SourceLocation};
#[test]
fn test_compliant_agent() {
let agent = Agent {
id: "test_agent".to_string(),
name: "Test Agent".to_string(),
location: SourceLocation {
file: "test.py".to_string(),
line: 1,
end_line: Some(10),
function: None,
},
model_id: None,
tool_ids: Vec::new(),
memory_id: None,
system_prompt: None,
result_type: None,
deps_type: None,
};
let result = ScanResult::new();
let boundary = analyze_agent(&agent, &result);
assert!(boundary.compliant);
assert!(boundary.violations.is_empty());
}
#[test]
fn test_untrusted_input_patterns() {
assert!(has_untrusted_input_patterns("user_data = input('Enter: ')"));
assert!(has_untrusted_input_patterns("data = request.form['name']"));
assert!(has_untrusted_input_patterns("json_data = request.json"));
assert!(has_untrusted_input_patterns("@app.route('/api')"));
assert!(!has_untrusted_input_patterns("print('hello')"));
}
#[test]
fn test_sensitive_access_patterns() {
assert!(has_sensitive_access_patterns(
"result = db.query('SELECT * FROM users')"
));
assert!(has_sensitive_access_patterns("cursor.execute(sql)"));
assert!(has_sensitive_access_patterns(
"api_key = os.getenv('API_KEY')"
));
assert!(has_sensitive_access_patterns(
"with open('/etc/passwd') as f:"
));
assert!(!has_sensitive_access_patterns("print('hello')"));
}
#[test]
fn test_external_action_patterns() {
assert!(has_external_action_patterns("response = requests.get(url)"));
assert!(has_external_action_patterns(
"subprocess.run(['ls', '-la'])"
));
assert!(has_external_action_patterns("file.write(data)"));
assert!(has_external_action_patterns(
"Path('file.txt').write_text(content)"
));
assert!(!has_external_action_patterns("print('hello')"));
}
#[test]
fn test_rule_of_two_violation() {
let code = r#"
import os
import requests
def process_user_request():
# [A] Untrusted input
user_input = input("Enter URL: ")
# [B] Sensitive access
api_key = os.getenv("API_KEY")
# [C] External action
response = requests.get(user_input, headers={"Authorization": api_key})
return response.text
"#;
assert!(has_untrusted_input_patterns(code));
assert!(has_sensitive_access_patterns(code));
assert!(has_external_action_patterns(code));
}
}