use crate::error::Result;
use crate::schema::{MemoryFinding, ScanResult, SourceLocation};
use once_cell::sync::Lazy;
use regex::Regex;
static VECTOR_STORE_PATTERNS: &[&str] = &[
"from langchain.vectorstores import",
"from langchain_community.vectorstores import",
"import chromadb",
"from chromadb",
"import pinecone",
"from pinecone",
"import faiss",
"from faiss",
"import qdrant_client",
"from qdrant_client",
"from llama_index.vector_stores import",
];
static DATABASE_PATTERNS: &[&str] = &[
"import sqlite3",
"from sqlite3",
"import psycopg2",
"from psycopg2",
"import pymongo",
"from pymongo",
"import redis",
"from redis",
"from sqlalchemy import",
"import sqlalchemy",
"from databases import",
];
static VECTOR_STORE_INSTANTIATION: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
("chroma", Regex::new(r"Chroma\s*\(").unwrap()),
(
"chroma_client",
Regex::new(r"chromadb\.Client\s*\(").unwrap(),
),
(
"pinecone_index",
Regex::new(r"pinecone\.Index\s*\(").unwrap(),
),
(
"faiss_index",
Regex::new(r"FAISS\.from_documents\s*\(|FAISS\.from_texts\s*\(").unwrap(),
),
("qdrant", Regex::new(r"QdrantClient\s*\(").unwrap()),
("weaviate", Regex::new(r"weaviate\.Client\s*\(").unwrap()),
]
});
static DATABASE_INSTANTIATION: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
("sqlite", Regex::new(r"sqlite3\.connect\s*\(").unwrap()),
("postgresql", Regex::new(r"psycopg2\.connect\s*\(").unwrap()),
("mongodb", Regex::new(r"MongoClient\s*\(").unwrap()),
(
"redis",
Regex::new(r"redis\.Redis\s*\(|redis\.StrictRedis\s*\(").unwrap(),
),
("sqlalchemy", Regex::new(r"create_engine\s*\(").unwrap()),
]
});
static FILE_PERSISTENCE_PATTERNS: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
("shelve", Regex::new(r"shelve\.open\s*\(").unwrap()),
("pickle", Regex::new(r"pickle\.(dump|load)\s*\(").unwrap()),
("joblib", Regex::new(r"joblib\.(dump|load)\s*\(").unwrap()),
]
});
pub fn analyze(result: &ScanResult) -> Result<Vec<MemoryFinding>> {
let mut findings = Vec::new();
for file_path in &result.manifest.files {
if let Ok(content) = std::fs::read_to_string(file_path) {
let file_findings = scan_file(file_path, &content)?;
findings.extend(file_findings);
}
}
Ok(findings)
}
fn scan_file(file_path: &str, content: &str) -> Result<Vec<MemoryFinding>> {
let mut findings = Vec::new();
for (line_num, line) in content.lines().enumerate() {
let line_number = (line_num + 1) as u32;
for pattern in VECTOR_STORE_PATTERNS {
if line.contains(pattern) {
findings.push(MemoryFinding {
id: format!(
"memory_import_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
memory_type: "vector_store_import".to_string(),
technology: extract_technology_from_import(line),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
configuration: extract_configuration(line),
message: format!("Vector store import detected: {pattern}"),
});
break;
}
}
for pattern in DATABASE_PATTERNS {
if line.contains(pattern) {
findings.push(MemoryFinding {
id: format!(
"memory_db_import_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
memory_type: "database_import".to_string(),
technology: extract_technology_from_import(line),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
configuration: None,
message: format!("Database import detected: {pattern}"),
});
break;
}
}
for (tech, pattern) in VECTOR_STORE_INSTANTIATION.iter() {
if pattern.is_match(line) {
findings.push(MemoryFinding {
id: format!(
"memory_vector_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
memory_type: "vector_store".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
configuration: extract_configuration(line),
message: format!("Vector store instantiation: {tech}"),
});
}
}
for (tech, pattern) in DATABASE_INSTANTIATION.iter() {
if pattern.is_match(line) {
findings.push(MemoryFinding {
id: format!(
"memory_db_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
memory_type: "database".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
configuration: extract_configuration(line),
message: format!("Database connection: {tech}"),
});
}
}
for (tech, pattern) in FILE_PERSISTENCE_PATTERNS.iter() {
if pattern.is_match(line) {
findings.push(MemoryFinding {
id: format!(
"memory_file_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
memory_type: "file_persistence".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
configuration: extract_configuration(line),
message: format!("File persistence: {tech}"),
});
}
}
}
Ok(findings)
}
fn extract_technology_from_import(line: &str) -> String {
let line_lower = line.to_lowercase();
if line_lower.contains("chroma") {
"chromadb".to_string()
} else if line_lower.contains("pinecone") {
"pinecone".to_string()
} else if line_lower.contains("faiss") {
"faiss".to_string()
} else if line_lower.contains("qdrant") {
"qdrant".to_string()
} else if line_lower.contains("weaviate") {
"weaviate".to_string()
} else if line_lower.contains("sqlite") {
"sqlite".to_string()
} else if line_lower.contains("psycopg") {
"postgresql".to_string()
} else if line_lower.contains("pymongo") {
"mongodb".to_string()
} else if line_lower.contains("redis") {
"redis".to_string()
} else if line_lower.contains("sqlalchemy") {
"sqlalchemy".to_string()
} else {
"unknown".to_string()
}
}
fn extract_configuration(line: &str) -> Option<String> {
if line.contains("persist_directory") {
extract_parameter(line, "persist_directory")
} else if line.contains("collection_name") {
extract_parameter(line, "collection_name")
} else if line.contains("index_name") {
extract_parameter(line, "index_name")
} else if line.contains("host") {
extract_parameter(line, "host")
} else {
None
}
}
fn extract_parameter(line: &str, param_name: &str) -> Option<String> {
let pattern = format!(r#"{param_name}[\s]*=[\s]*["']([^"']+)["']"#);
let re = Regex::new(&pattern).ok()?;
re.captures(line)
.and_then(|cap| cap.get(1))
.map(|m| m.as_str().to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chroma_import_detection() {
let code = "from langchain.vectorstores import Chroma";
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings
.iter()
.any(|f| f.memory_type == "vector_store_import"));
assert!(findings.iter().any(|f| f.technology == "chromadb"));
}
#[test]
fn test_chroma_instantiation() {
let code = r#"vectorstore = Chroma(persist_directory="./chroma_db")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.memory_type == "vector_store"));
assert!(findings.iter().any(|f| f.technology == "chroma"));
}
#[test]
fn test_faiss_detection() {
let code = "index = FAISS.from_documents(docs, embeddings)";
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.technology == "faiss_index"));
}
#[test]
fn test_sqlite_connection() {
let code = r#"conn = sqlite3.connect("database.db")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.memory_type == "database"));
assert!(findings.iter().any(|f| f.technology == "sqlite"));
}
#[test]
fn test_redis_connection() {
let code = "r = redis.Redis(host='localhost', port=6379)";
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.technology == "redis"));
}
#[test]
fn test_mongodb_connection() {
let code = r#"client = MongoClient("mongodb://localhost:27017/")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.technology == "mongodb"));
}
#[test]
fn test_sqlalchemy_engine() {
let code = r#"engine = create_engine("postgresql://user:pass@localhost/db")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.technology == "sqlalchemy"));
}
#[test]
fn test_configuration_extraction() {
let code = r#"vectorstore = Chroma(persist_directory="./my_db")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
let finding = findings.iter().find(|f| f.technology == "chroma").unwrap();
assert_eq!(finding.configuration, Some("./my_db".to_string()));
}
#[test]
fn test_no_false_positives() {
let code = "print('Hello World')";
let findings = scan_file("test.py", code).unwrap();
assert!(findings.is_empty());
}
}