raxit-core 0.1.2

Core security scanning engine for AI agent applications
Documentation
//! Data Provenance Analyzer - CaMeL-style Taint Analysis
//!
//! Implements Meta's "Confidentiality and Minimal Leakage" (CaMeL) framework
//! for tracking data flow from untrusted sources through transformations to sinks.
//!
//! Tracks:
//! - Data sources (user input, file reads, API calls)
//! - Data transformations (function calls, assignments)
//! - Data sinks (print, file writes, API sends)
//! - Taint propagation through variable assignments

use crate::error::Result;
use crate::schema::{DataFlow, ProvenanceFinding, ScanResult, SourceLocation};
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::{HashMap, HashSet};

/// Data source patterns (untrusted input)
static SOURCE_PATTERNS: &[(&str, &str)] = &[
    ("user_input", r"input\s*\("),
    ("file_read", r"open\s*\(|\.read\(|\.readlines\("),
    ("api_call", r"requests\.(get|post|put|delete)\("),
    ("httpx_call", r"httpx\.(get|post)\("),
    ("form_data", r"request\.(form|json|args|files)"),
    ("command_args", r"sys\.argv"),
    ("env_var", r"os\.environ\[|os\.getenv\("),
];

/// Data sink patterns (outputs)
static SINK_PATTERNS: &[(&str, &str)] = &[
    ("print", r"\bprint\s*\("),
    ("file_write", r"\.write\(|\.write_text\("),
    ("api_send", r"requests\.(post|put)\("),
    ("httpx_send", r"httpx\.post\("),
    ("subprocess", r"subprocess\.(run|call|Popen)\("),
    ("database_insert", r"\.insert\(|\.execute\(|\.query\("),
    ("llm_call", r"\.chat\(|\.complete\(|\.generate\("),
];

/// Assignment pattern for tracking taint propagation
static ASSIGNMENT_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"(\w+)\s*=\s*(.+)").unwrap());

/// Function call pattern for transformations
#[allow(dead_code)]
static FUNCTION_CALL_PATTERN: Lazy<Regex> =
    Lazy::new(|| Regex::new(r"(\w+)\s*\(([^)]*)\)").unwrap());

/// Taint tracking state
#[derive(Debug, Clone)]
struct TaintState {
    /// Variables that are tainted (contain untrusted data)
    tainted_vars: HashSet<String>,

    /// Variable assignments (var -> expression)
    assignments: HashMap<String, String>,

    /// Data flows discovered
    flows: Vec<DataFlow>,
}

impl TaintState {
    fn new() -> Self {
        Self {
            tainted_vars: HashSet::new(),
            assignments: HashMap::new(),
            flows: Vec::new(),
        }
    }

    /// Mark a variable as tainted
    fn taint_variable(&mut self, var: String, source: String) {
        self.tainted_vars.insert(var.clone());
        self.flows.push(DataFlow {
            variable: var,
            source,
            readers: Vec::new(),
            writers: Vec::new(),
            taint_level: "high".to_string(),
        });
    }

    /// Check if a variable is tainted
    #[allow(dead_code)]
    fn is_tainted(&self, var: &str) -> bool {
        self.tainted_vars.contains(var)
    }

    /// Propagate taint through assignment
    fn propagate_taint(&mut self, target: &str, source_expr: &str) {
        // Check if any tainted variable is used in the source expression
        for tainted_var in &self.tainted_vars {
            if source_expr.contains(tainted_var) {
                self.tainted_vars.insert(target.to_string());
                break;
            }
        }
    }
}

/// Analyze scan results for data provenance
pub fn analyze(result: &ScanResult) -> Result<Vec<ProvenanceFinding>> {
    let mut findings = Vec::new();

    // Scan all files in the manifest
    for file_path in &result.manifest.files {
        if let Ok(content) = std::fs::read_to_string(file_path) {
            let file_findings = scan_file(file_path, &content)?;
            findings.extend(file_findings);
        }
    }

    Ok(findings)
}

/// Scan a single file for data provenance issues
fn scan_file(file_path: &str, content: &str) -> Result<Vec<ProvenanceFinding>> {
    let mut findings = Vec::new();
    let mut taint_state = TaintState::new();

    let lines: Vec<&str> = content.lines().collect();

    // First pass: identify sources and track taint
    for (line_num, line) in lines.iter().enumerate() {
        let _line_number = (line_num + 1) as u32;

        // Check for data sources
        for (source_type, pattern_str) in SOURCE_PATTERNS {
            let pattern = Regex::new(pattern_str).unwrap();
            if pattern.is_match(line) {
                // Extract variable being assigned
                if let Some(var_name) = extract_assignment_target(line) {
                    taint_state.taint_variable(var_name.clone(), source_type.to_string());
                }
            }
        }

        // Track variable assignments for taint propagation
        if let Some(captures) = ASSIGNMENT_PATTERN.captures(line) {
            if let (Some(target), Some(expr)) = (captures.get(1), captures.get(2)) {
                let target_var = target.as_str();
                let source_expr = expr.as_str();

                // Propagate taint if source expression contains tainted variables
                taint_state.propagate_taint(target_var, source_expr);
                taint_state
                    .assignments
                    .insert(target_var.to_string(), source_expr.to_string());
            }
        }
    }

    // Second pass: check for tainted data reaching sinks
    for (line_num, line) in lines.iter().enumerate() {
        let line_number = (line_num + 1) as u32;

        for (sink_type, pattern_str) in SINK_PATTERNS {
            let pattern = Regex::new(pattern_str).unwrap();
            if pattern.is_match(line) {
                // Check if any tainted variable is used in this sink
                let tainted_vars_in_line: Vec<String> = taint_state
                    .tainted_vars
                    .iter()
                    .filter(|var| line.contains(var.as_str()))
                    .cloned()
                    .collect();

                if !tainted_vars_in_line.is_empty() {
                    findings.push(ProvenanceFinding {
                        id: format!(
                            "provenance_{}_{}",
                            file_path.replace(['/', '.'], "_"),
                            line_number
                        ),
                        finding_type: "tainted_sink".to_string(),
                        source_type: "untrusted_input".to_string(),
                        sink_type: sink_type.to_string(),
                        tainted_variables: tainted_vars_in_line.clone(),
                        location: SourceLocation {
                            file: file_path.to_string(),
                            line: line_number,
                            end_line: Some(line_number),
                            function: None,
                        },
                        severity: determine_severity(sink_type),
                        message: format!(
                            "Tainted data from untrusted source reaches {}: {}",
                            sink_type,
                            tainted_vars_in_line.join(", ")
                        ),
                        data_flow: extract_data_flow(&taint_state, &tainted_vars_in_line),
                    });
                }
            }
        }
    }

    Ok(findings)
}

/// Extract the target variable from an assignment
fn extract_assignment_target(line: &str) -> Option<String> {
    if let Some(captures) = ASSIGNMENT_PATTERN.captures(line) {
        captures.get(1).map(|m| m.as_str().trim().to_string())
    } else {
        None
    }
}

/// Extract data flow path for tainted variables
fn extract_data_flow(state: &TaintState, vars: &[String]) -> Option<String> {
    if vars.is_empty() {
        return None;
    }

    // Build a simple flow description
    let flows: Vec<String> = state
        .flows
        .iter()
        .filter(|flow| vars.contains(&flow.variable))
        .map(|flow| format!("{} ({})", flow.variable, flow.source))
        .collect();

    if flows.is_empty() {
        None
    } else {
        Some(flows.join(" → "))
    }
}

/// Determine severity based on sink type
fn determine_severity(sink_type: &str) -> String {
    match sink_type {
        "subprocess" | "database_insert" => "critical".to_string(),
        "file_write" | "api_send" | "llm_call" => "high".to_string(),
        "print" | "httpx_send" => "medium".to_string(),
        _ => "low".to_string(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_simple_taint_flow() {
        let code = r#"
user_input = input("Enter data: ")
print(user_input)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.finding_type == "tainted_sink"));
        assert!(findings.iter().any(|f| f.sink_type == "print"));
    }

    #[test]
    fn test_taint_propagation() {
        let code = r#"
user_input = input("Enter data: ")
processed = user_input.upper()
print(processed)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings
            .iter()
            .any(|f| f.tainted_variables.contains(&"processed".to_string())));
    }

    #[test]
    fn test_file_read_to_write() {
        let code = r#"
data = open("input.txt").read()
with open("output.txt", "w") as f:
    f.write(data)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.sink_type == "file_write"));
    }

    #[test]
    fn test_api_to_database() {
        let code = r#"
response = requests.get(url).json()
db.insert(response)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.sink_type == "database_insert"));
        assert!(findings.iter().any(|f| f.severity == "critical"));
    }

    #[test]
    fn test_user_input_to_subprocess() {
        let code = r#"
command = input("Enter command: ")
subprocess.run(command)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.sink_type == "subprocess"));
        assert!(findings.iter().any(|f| f.severity == "critical"));
    }

    #[test]
    fn test_form_data_to_llm() {
        let code = r#"
user_query = request.form['query']
response = llm.chat(user_query)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.sink_type == "llm_call"));
    }

    #[test]
    fn test_no_taint_safe_flow() {
        let code = r#"
safe_data = "hardcoded value"
print(safe_data)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(findings.is_empty());
    }

    #[test]
    fn test_multiple_transformations() {
        let code = r#"
user_input = input("Enter: ")
step1 = transform1(user_input)
step2 = transform2(step1)
print(step2)
        "#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        // All variables in the chain should be tainted
        let finding = findings.first().unwrap();
        assert!(finding.tainted_variables.iter().any(|v| v == "step2"));
    }

    #[test]
    fn test_severity_classification() {
        assert_eq!(determine_severity("subprocess"), "critical");
        assert_eq!(determine_severity("database_insert"), "critical");
        assert_eq!(determine_severity("file_write"), "high");
        assert_eq!(determine_severity("print"), "medium");
    }

    #[test]
    fn test_data_flow_extraction() {
        let code = r#"
user_data = input("Enter: ")
processed = user_data.strip()
        "#;
        let _findings = scan_file("test.py", code).unwrap();

        // Should detect user_data as tainted source
        let state = TaintState::new();
        assert!(state.tainted_vars.is_empty()); // Initial state
    }
}