raxit-core 0.1.2

Core security scanning engine for AI agent applications
Documentation
//! Trust Boundary Analysis - Meta's "Rule of Two"
//!
//! A component is non-compliant if it has all three:
//! [A] Untrusted input (user input, external APIs)
//! [B] Sensitive access (secrets, databases, file system)
//! [C] External actions (API calls, commands, writes)

use crate::error::Result;
use crate::schema::{ScanResult, TrustBoundary};

/// Analyze all agents and tools for trust boundary violations
pub fn analyze(result: &ScanResult) -> Result<Vec<TrustBoundary>> {
    let mut boundaries = Vec::new();

    // Analyze agents
    for agent in &result.agents {
        let boundary = analyze_agent(agent, result);
        boundaries.push(boundary);
    }

    // Analyze tools
    for tool in &result.tools {
        let boundary = analyze_tool(tool);
        boundaries.push(boundary);
    }

    Ok(boundaries)
}

fn analyze_agent(agent: &crate::schema::Agent, _result: &ScanResult) -> TrustBoundary {
    // Analyze agent for trust boundary violations
    let has_untrusted_input = check_untrusted_input_agent(agent);
    let has_sensitive_access = check_sensitive_access_agent(agent);
    let has_external_actions = check_external_actions_agent(agent);

    let compliant = !(has_untrusted_input && has_sensitive_access && has_external_actions);

    let mut violations = Vec::new();
    if !compliant {
        violations.push(format!(
            "Agent '{}' violates Rule of Two: has untrusted input, sensitive access, and external actions",
            agent.name
        ));
    }

    TrustBoundary {
        id: format!("boundary_{}", agent.id),
        component_id: agent.id.clone(),
        component_type: "agent".to_string(),
        has_untrusted_input,
        has_sensitive_access,
        has_external_actions,
        compliant,
        violations,
        location: agent.location.clone(),
    }
}

fn analyze_tool(tool: &crate::schema::Tool) -> TrustBoundary {
    // Analyze tool for trust boundary violations
    let has_untrusted_input = check_untrusted_input_tool(tool);
    let has_sensitive_access = check_sensitive_access_tool(tool);
    let has_external_actions = check_external_actions_tool(tool);

    let compliant = !(has_untrusted_input && has_sensitive_access && has_external_actions);

    let mut violations = Vec::new();
    if !compliant {
        violations.push(format!(
            "Tool '{}' violates Rule of Two: has untrusted input, sensitive access, and external actions",
            tool.name
        ));
    }

    TrustBoundary {
        id: format!("boundary_{}", tool.id),
        component_id: tool.id.clone(),
        component_type: "tool".to_string(),
        has_untrusted_input,
        has_sensitive_access,
        has_external_actions,
        compliant,
        violations,
        location: tool.location.clone(),
    }
}

fn check_untrusted_input_agent(agent: &crate::schema::Agent) -> bool {
    // Check if agent processes untrusted input
    // Read the source file to analyze the code
    if let Ok(source_code) = std::fs::read_to_string(&agent.location.file) {
        has_untrusted_input_patterns(&source_code)
    } else {
        false
    }
}

fn check_sensitive_access_agent(agent: &crate::schema::Agent) -> bool {
    // Check if agent has access to sensitive resources
    // Read the source file to analyze the code
    if let Ok(source_code) = std::fs::read_to_string(&agent.location.file) {
        has_sensitive_access_patterns(&source_code)
    } else {
        false
    }
}

fn check_external_actions_agent(agent: &crate::schema::Agent) -> bool {
    // Check if agent performs external actions
    // Read the source file to analyze the code
    if let Ok(source_code) = std::fs::read_to_string(&agent.location.file) {
        has_external_action_patterns(&source_code)
    } else {
        false
    }
}

fn check_untrusted_input_tool(tool: &crate::schema::Tool) -> bool {
    // Check if tool receives untrusted input
    // Look for data flows with high taint level
    tool.data_flows
        .iter()
        .any(|flow| flow.taint_level == "high")
}

fn check_sensitive_access_tool(tool: &crate::schema::Tool) -> bool {
    // Check if tool has access to sensitive resources
    // Read the source file to analyze the code
    if let Ok(source_code) = std::fs::read_to_string(&tool.location.file) {
        has_sensitive_access_patterns(&source_code)
    } else {
        false
    }
}

fn check_external_actions_tool(tool: &crate::schema::Tool) -> bool {
    // Check if tool performs external actions
    // Read the source file to analyze the code
    if let Ok(source_code) = std::fs::read_to_string(&tool.location.file) {
        has_external_action_patterns(&source_code)
    } else {
        false
    }
}

// Pattern matching functions

/// Check if code contains untrusted input patterns [A]
fn has_untrusted_input_patterns(code: &str) -> bool {
    // User input patterns
    let user_input_patterns = [
        "input(",          // Python input()
        "request.form",    // Flask/Django form data
        "request.json",    // Flask/Django JSON data
        "request.args",    // Flask/Django query parameters
        "request.files",   // Flask/Django file uploads
        "request.get(",    // Django GET parameters
        "request.POST",    // Django POST data
        "sys.argv",        // Command line arguments
        "@app.route",      // Flask route (API endpoints)
        "@api_view",       // Django REST framework
        "fastapi.Request", // FastAPI request
    ];

    // Check for any user input pattern
    user_input_patterns
        .iter()
        .any(|pattern| code.contains(pattern))
}

/// Check if code contains sensitive access patterns [B]
fn has_sensitive_access_patterns(code: &str) -> bool {
    // Database access patterns
    let db_patterns = [
        "db.query(",
        "cursor.execute(",
        ".execute(", // Generic SQL execution
        "sqlite3.connect(",
        "psycopg2.connect(",
        "pymongo.MongoClient(",
        "redis.Redis(",
        "Session.query(", // SQLAlchemy
        "from sqlalchemy",
    ];

    // Secret/credential access patterns
    let secret_patterns = [
        "os.environ[",
        "os.getenv(",
        "getenv(",
        "AWS_ACCESS_KEY",
        "API_KEY",
        "PASSWORD",
        "SECRET",
        "TOKEN",
        "credentials",
        "boto3.client(", // AWS SDK
        "secrets.get(",
    ];

    // File system access patterns (sensitive paths)
    let file_patterns = [
        "open(", "Path(", "/etc/", "/var/", "~/.ssh/", "~/.aws/", ".read(", ".write(",
    ];

    db_patterns.iter().any(|p| code.contains(p))
        || secret_patterns.iter().any(|p| code.contains(p))
        || file_patterns.iter().any(|p| code.contains(p))
}

/// Check if code contains external action patterns [C]
fn has_external_action_patterns(code: &str) -> bool {
    // HTTP request patterns
    let http_patterns = [
        "requests.get(",
        "requests.post(",
        "requests.put(",
        "requests.delete(",
        "httpx.get(",
        "httpx.post(",
        "urllib.request",
        "http.client",
        "aiohttp.ClientSession(",
    ];

    // Process execution patterns
    let process_patterns = [
        "subprocess.run(",
        "subprocess.call(",
        "subprocess.Popen(",
        "os.system(",
        "os.popen(",
        "os.spawn",
        "commands.getoutput(",
    ];

    // File write patterns
    let write_patterns = [
        ".write(",
        ".write_text(",
        ".write_bytes(",
        "with open(", // Potentially writes
        "Path(",      // Potentially writes
    ];

    http_patterns.iter().any(|p| code.contains(p))
        || process_patterns.iter().any(|p| code.contains(p))
        || write_patterns.iter().any(|p| code.contains(p))
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::schema::{Agent, SourceLocation};

    #[test]
    fn test_compliant_agent() {
        let agent = Agent {
            id: "test_agent".to_string(),
            name: "Test Agent".to_string(),
            location: SourceLocation {
                file: "test.py".to_string(),
                line: 1,
                end_line: Some(10),
                function: None,
            },
            model_id: None,
            tool_ids: Vec::new(),
            memory_id: None,
            system_prompt: None,
            result_type: None,
            deps_type: None,
        };

        let result = ScanResult::new();
        let boundary = analyze_agent(&agent, &result);

        assert!(boundary.compliant);
        assert!(boundary.violations.is_empty());
    }

    #[test]
    fn test_untrusted_input_patterns() {
        assert!(has_untrusted_input_patterns("user_data = input('Enter: ')"));
        assert!(has_untrusted_input_patterns("data = request.form['name']"));
        assert!(has_untrusted_input_patterns("json_data = request.json"));
        assert!(has_untrusted_input_patterns("@app.route('/api')"));
        assert!(!has_untrusted_input_patterns("print('hello')"));
    }

    #[test]
    fn test_sensitive_access_patterns() {
        assert!(has_sensitive_access_patterns(
            "result = db.query('SELECT * FROM users')"
        ));
        assert!(has_sensitive_access_patterns("cursor.execute(sql)"));
        assert!(has_sensitive_access_patterns(
            "api_key = os.getenv('API_KEY')"
        ));
        assert!(has_sensitive_access_patterns(
            "with open('/etc/passwd') as f:"
        ));
        assert!(!has_sensitive_access_patterns("print('hello')"));
    }

    #[test]
    fn test_external_action_patterns() {
        assert!(has_external_action_patterns("response = requests.get(url)"));
        assert!(has_external_action_patterns(
            "subprocess.run(['ls', '-la'])"
        ));
        assert!(has_external_action_patterns("file.write(data)"));
        assert!(has_external_action_patterns(
            "Path('file.txt').write_text(content)"
        ));
        assert!(!has_external_action_patterns("print('hello')"));
    }

    #[test]
    fn test_rule_of_two_violation() {
        // Code with all three violations (A + B + C)
        let code = r#"
import os
import requests

def process_user_request():
    # [A] Untrusted input
    user_input = input("Enter URL: ")

    # [B] Sensitive access
    api_key = os.getenv("API_KEY")

    # [C] External action
    response = requests.get(user_input, headers={"Authorization": api_key})
    return response.text
        "#;

        assert!(has_untrusted_input_patterns(code));
        assert!(has_sensitive_access_patterns(code));
        assert!(has_external_action_patterns(code));
    }
}