raxit-core 0.1.2

Core security scanning engine for AI agent applications
Documentation
//! Memory Detection Analyzer
//!
//! Detects memory and database usage including:
//! - Vector stores (Chroma, Pinecone, FAISS, Qdrant)
//! - Databases (SQLite, PostgreSQL, MongoDB, Redis)
//! - Caching systems (Redis, TTLCache)
//! - File-based persistence (shelve, pickle)

use crate::error::Result;
use crate::schema::{MemoryFinding, ScanResult, SourceLocation};
use once_cell::sync::Lazy;
use regex::Regex;

/// Vector store import patterns
static VECTOR_STORE_PATTERNS: &[&str] = &[
    "from langchain.vectorstores import",
    "from langchain_community.vectorstores import",
    "import chromadb",
    "from chromadb",
    "import pinecone",
    "from pinecone",
    "import faiss",
    "from faiss",
    "import qdrant_client",
    "from qdrant_client",
    "from llama_index.vector_stores import",
];

/// Database import patterns
static DATABASE_PATTERNS: &[&str] = &[
    "import sqlite3",
    "from sqlite3",
    "import psycopg2",
    "from psycopg2",
    "import pymongo",
    "from pymongo",
    "import redis",
    "from redis",
    "from sqlalchemy import",
    "import sqlalchemy",
    "from databases import",
];

/// Vector store instantiation patterns with regex
static VECTOR_STORE_INSTANTIATION: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
    vec![
        ("chroma", Regex::new(r"Chroma\s*\(").unwrap()),
        (
            "chroma_client",
            Regex::new(r"chromadb\.Client\s*\(").unwrap(),
        ),
        (
            "pinecone_index",
            Regex::new(r"pinecone\.Index\s*\(").unwrap(),
        ),
        (
            "faiss_index",
            Regex::new(r"FAISS\.from_documents\s*\(|FAISS\.from_texts\s*\(").unwrap(),
        ),
        ("qdrant", Regex::new(r"QdrantClient\s*\(").unwrap()),
        ("weaviate", Regex::new(r"weaviate\.Client\s*\(").unwrap()),
    ]
});

/// Database connection patterns with regex
static DATABASE_INSTANTIATION: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
    vec![
        ("sqlite", Regex::new(r"sqlite3\.connect\s*\(").unwrap()),
        ("postgresql", Regex::new(r"psycopg2\.connect\s*\(").unwrap()),
        ("mongodb", Regex::new(r"MongoClient\s*\(").unwrap()),
        (
            "redis",
            Regex::new(r"redis\.Redis\s*\(|redis\.StrictRedis\s*\(").unwrap(),
        ),
        ("sqlalchemy", Regex::new(r"create_engine\s*\(").unwrap()),
    ]
});

/// File-based persistence patterns with regex
static FILE_PERSISTENCE_PATTERNS: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
    vec![
        ("shelve", Regex::new(r"shelve\.open\s*\(").unwrap()),
        ("pickle", Regex::new(r"pickle\.(dump|load)\s*\(").unwrap()),
        ("joblib", Regex::new(r"joblib\.(dump|load)\s*\(").unwrap()),
    ]
});

/// Analyze scan results for memory usage
pub fn analyze(result: &ScanResult) -> Result<Vec<MemoryFinding>> {
    let mut findings = Vec::new();

    // Scan all files in the manifest
    for file_path in &result.manifest.files {
        if let Ok(content) = std::fs::read_to_string(file_path) {
            let file_findings = scan_file(file_path, &content)?;
            findings.extend(file_findings);
        }
    }

    Ok(findings)
}

/// Scan a single file for memory usage patterns
fn scan_file(file_path: &str, content: &str) -> Result<Vec<MemoryFinding>> {
    let mut findings = Vec::new();

    for (line_num, line) in content.lines().enumerate() {
        let line_number = (line_num + 1) as u32;

        // Check for vector store imports
        for pattern in VECTOR_STORE_PATTERNS {
            if line.contains(pattern) {
                findings.push(MemoryFinding {
                    id: format!(
                        "memory_import_{}_{}",
                        file_path.replace(['/', '.'], "_"),
                        line_number
                    ),
                    memory_type: "vector_store_import".to_string(),
                    technology: extract_technology_from_import(line),
                    location: SourceLocation {
                        file: file_path.to_string(),
                        line: line_number,
                        end_line: Some(line_number),
                        function: None,
                    },
                    configuration: extract_configuration(line),
                    message: format!("Vector store import detected: {pattern}"),
                });
                break;
            }
        }

        // Check for database imports
        for pattern in DATABASE_PATTERNS {
            if line.contains(pattern) {
                findings.push(MemoryFinding {
                    id: format!(
                        "memory_db_import_{}_{}",
                        file_path.replace(['/', '.'], "_"),
                        line_number
                    ),
                    memory_type: "database_import".to_string(),
                    technology: extract_technology_from_import(line),
                    location: SourceLocation {
                        file: file_path.to_string(),
                        line: line_number,
                        end_line: Some(line_number),
                        function: None,
                    },
                    configuration: None,
                    message: format!("Database import detected: {pattern}"),
                });
                break;
            }
        }

        // Check for vector store instantiation
        for (tech, pattern) in VECTOR_STORE_INSTANTIATION.iter() {
            if pattern.is_match(line) {
                findings.push(MemoryFinding {
                    id: format!(
                        "memory_vector_{}_{}",
                        file_path.replace(['/', '.'], "_"),
                        line_number
                    ),
                    memory_type: "vector_store".to_string(),
                    technology: tech.to_string(),
                    location: SourceLocation {
                        file: file_path.to_string(),
                        line: line_number,
                        end_line: Some(line_number),
                        function: None,
                    },
                    configuration: extract_configuration(line),
                    message: format!("Vector store instantiation: {tech}"),
                });
            }
        }

        // Check for database connections
        for (tech, pattern) in DATABASE_INSTANTIATION.iter() {
            if pattern.is_match(line) {
                findings.push(MemoryFinding {
                    id: format!(
                        "memory_db_{}_{}",
                        file_path.replace(['/', '.'], "_"),
                        line_number
                    ),
                    memory_type: "database".to_string(),
                    technology: tech.to_string(),
                    location: SourceLocation {
                        file: file_path.to_string(),
                        line: line_number,
                        end_line: Some(line_number),
                        function: None,
                    },
                    configuration: extract_configuration(line),
                    message: format!("Database connection: {tech}"),
                });
            }
        }

        // Check for file persistence
        for (tech, pattern) in FILE_PERSISTENCE_PATTERNS.iter() {
            if pattern.is_match(line) {
                findings.push(MemoryFinding {
                    id: format!(
                        "memory_file_{}_{}",
                        file_path.replace(['/', '.'], "_"),
                        line_number
                    ),
                    memory_type: "file_persistence".to_string(),
                    technology: tech.to_string(),
                    location: SourceLocation {
                        file: file_path.to_string(),
                        line: line_number,
                        end_line: Some(line_number),
                        function: None,
                    },
                    configuration: extract_configuration(line),
                    message: format!("File persistence: {tech}"),
                });
            }
        }
    }

    Ok(findings)
}

/// Extract technology name from import statement
fn extract_technology_from_import(line: &str) -> String {
    let line_lower = line.to_lowercase();

    if line_lower.contains("chroma") {
        "chromadb".to_string()
    } else if line_lower.contains("pinecone") {
        "pinecone".to_string()
    } else if line_lower.contains("faiss") {
        "faiss".to_string()
    } else if line_lower.contains("qdrant") {
        "qdrant".to_string()
    } else if line_lower.contains("weaviate") {
        "weaviate".to_string()
    } else if line_lower.contains("sqlite") {
        "sqlite".to_string()
    } else if line_lower.contains("psycopg") {
        "postgresql".to_string()
    } else if line_lower.contains("pymongo") {
        "mongodb".to_string()
    } else if line_lower.contains("redis") {
        "redis".to_string()
    } else if line_lower.contains("sqlalchemy") {
        "sqlalchemy".to_string()
    } else {
        "unknown".to_string()
    }
}

/// Extract configuration from instantiation line
fn extract_configuration(line: &str) -> Option<String> {
    // Look for common configuration parameters
    if line.contains("persist_directory") {
        extract_parameter(line, "persist_directory")
    } else if line.contains("collection_name") {
        extract_parameter(line, "collection_name")
    } else if line.contains("index_name") {
        extract_parameter(line, "index_name")
    } else if line.contains("host") {
        extract_parameter(line, "host")
    } else {
        None
    }
}

/// Extract parameter value from line
fn extract_parameter(line: &str, param_name: &str) -> Option<String> {
    let pattern = format!(r#"{param_name}[\s]*=[\s]*["']([^"']+)["']"#);
    let re = Regex::new(&pattern).ok()?;
    re.captures(line)
        .and_then(|cap| cap.get(1))
        .map(|m| m.as_str().to_string())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_chroma_import_detection() {
        let code = "from langchain.vectorstores import Chroma";
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings
            .iter()
            .any(|f| f.memory_type == "vector_store_import"));
        assert!(findings.iter().any(|f| f.technology == "chromadb"));
    }

    #[test]
    fn test_chroma_instantiation() {
        let code = r#"vectorstore = Chroma(persist_directory="./chroma_db")"#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.memory_type == "vector_store"));
        assert!(findings.iter().any(|f| f.technology == "chroma"));
    }

    #[test]
    fn test_faiss_detection() {
        let code = "index = FAISS.from_documents(docs, embeddings)";
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.technology == "faiss_index"));
    }

    #[test]
    fn test_sqlite_connection() {
        let code = r#"conn = sqlite3.connect("database.db")"#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.memory_type == "database"));
        assert!(findings.iter().any(|f| f.technology == "sqlite"));
    }

    #[test]
    fn test_redis_connection() {
        let code = "r = redis.Redis(host='localhost', port=6379)";
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.technology == "redis"));
    }

    #[test]
    fn test_mongodb_connection() {
        let code = r#"client = MongoClient("mongodb://localhost:27017/")"#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.technology == "mongodb"));
    }

    #[test]
    fn test_sqlalchemy_engine() {
        let code = r#"engine = create_engine("postgresql://user:pass@localhost/db")"#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        assert!(findings.iter().any(|f| f.technology == "sqlalchemy"));
    }

    #[test]
    fn test_configuration_extraction() {
        let code = r#"vectorstore = Chroma(persist_directory="./my_db")"#;
        let findings = scan_file("test.py", code).unwrap();

        assert!(!findings.is_empty());
        let finding = findings.iter().find(|f| f.technology == "chroma").unwrap();
        assert_eq!(finding.configuration, Some("./my_db".to_string()));
    }

    #[test]
    fn test_no_false_positives() {
        let code = "print('Hello World')";
        let findings = scan_file("test.py", code).unwrap();

        assert!(findings.is_empty());
    }
}