use std::path::Path;
use agentic_codebase::engine::query::{
DependencyParams, ImpactParams, MatchMode, QueryEngine, SymbolLookupParams,
};
use agentic_codebase::format::{AcbReader, AcbWriter};
use agentic_codebase::graph::{CodeGraph, GraphBuilder};
use agentic_codebase::index::{EmbeddingIndex, LanguageIndex, PathIndex, SymbolIndex, TypeIndex};
use agentic_codebase::mcp::server::McpServer;
use agentic_codebase::parse::parser::{ParseOptions, Parser};
use agentic_codebase::semantic::analyzer::{AnalyzeOptions, SemanticAnalyzer};
use agentic_codebase::types::*;
fn create_multi_language_fixture(dir: &Path) {
let py_dir = dir.join("src").join("python");
std::fs::create_dir_all(&py_dir).unwrap();
std::fs::write(
py_dir.join("app.py"),
r#"
class UserService:
"""Service for managing users."""
def __init__(self, db):
self.db = db
def get_user(self, user_id: int):
"""Fetch a user by ID."""
return self.db.query(user_id)
def create_user(self, name: str, email: str):
"""Create a new user."""
user = {"name": name, "email": email}
return self.db.insert(user)
class OrderService:
"""Service for managing orders."""
def __init__(self, db, user_service: UserService):
self.db = db
self.user_service = user_service
def create_order(self, user_id: int, items: list):
"""Create a new order for a user."""
user = self.user_service.get_user(user_id)
order = {"user": user, "items": items}
return self.db.insert(order)
"#,
)
.unwrap();
std::fs::write(
py_dir.join("test_app.py"),
r#"
import unittest
class TestUserService(unittest.TestCase):
def test_get_user(self):
pass
def test_create_user(self):
pass
class TestOrderService(unittest.TestCase):
def test_create_order(self):
pass
"#,
)
.unwrap();
let rs_dir = dir.join("src").join("rust");
std::fs::create_dir_all(&rs_dir).unwrap();
std::fs::write(
rs_dir.join("lib.rs"),
r#"
pub mod auth;
pub mod handler;
pub trait Authenticator {
fn authenticate(&self, token: &str) -> bool;
}
pub struct JwtAuth {
secret: String,
}
impl Authenticator for JwtAuth {
fn authenticate(&self, token: &str) -> bool {
!token.is_empty()
}
}
pub fn process_request(auth: &dyn Authenticator, token: &str) -> Result<(), String> {
if auth.authenticate(token) {
Ok(())
} else {
Err("unauthorized".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_process_request() {
let auth = JwtAuth { secret: "key".into() };
assert!(process_request(&auth, "valid").is_ok());
}
}
"#,
)
.unwrap();
let ts_dir = dir.join("src").join("typescript");
std::fs::create_dir_all(&ts_dir).unwrap();
std::fs::write(
ts_dir.join("api.ts"),
r#"
interface User {
id: number;
name: string;
email: string;
}
interface Order {
id: number;
userId: number;
items: string[];
}
class ApiClient {
private baseUrl: string;
constructor(baseUrl: string) {
this.baseUrl = baseUrl;
}
async getUser(id: number): Promise<User> {
const response = await fetch(`${this.baseUrl}/users/${id}`);
return response.json();
}
async createOrder(order: Order): Promise<Order> {
const response = await fetch(`${this.baseUrl}/orders`, {
method: 'POST',
body: JSON.stringify(order),
});
return response.json();
}
}
export function createApiClient(url: string): ApiClient {
return new ApiClient(url);
}
"#,
)
.unwrap();
}
fn build_large_graph(unit_count: usize) -> CodeGraph {
let mut builder = GraphBuilder::new(DEFAULT_DIMENSION);
builder = builder.add_unit(CodeUnit::new(
CodeUnitType::Module,
Language::Python,
"main_module".into(),
"main_module".into(),
"src/main.py".into(),
Span::new(1, 0, 100, 0),
));
for i in 1..unit_count {
let name = format!("function_{}", i);
let file = format!("src/module_{}.py", i % 50);
let lang = match i % 3 {
0 => Language::Rust,
1 => Language::Python,
_ => Language::TypeScript,
};
builder = builder.add_unit(CodeUnit::new(
CodeUnitType::Function,
lang,
name.clone(),
format!("module_{}.{}", i % 50, name),
file.into(),
Span::new(1, 0, 20, 0),
));
if i > 1 {
builder = builder.add_edge(Edge::new((i - 1) as u64, i as u64, EdgeType::Calls));
}
if i % 10 == 0 {
builder = builder.add_edge(Edge::new(i as u64, 0, EdgeType::Imports));
}
if i > 1 && i % 7 == 0 {
builder = builder.add_edge(Edge::new((i - 1) as u64, i as u64, EdgeType::Contains));
}
}
builder.build().unwrap()
}
#[test]
fn test_full_lifecycle() {
let tmp = tempfile::tempdir().unwrap();
let src_dir = tmp.path().join("project");
std::fs::create_dir_all(&src_dir).unwrap();
create_multi_language_fixture(&src_dir);
let parser = Parser::new();
let opts = ParseOptions::default();
let result = parser.parse_directory(&src_dir, &opts);
assert!(result.is_ok(), "Parse failed: {:?}", result.err());
let parse_result = result.unwrap();
assert!(
parse_result.units.len() >= 5,
"Expected at least 5 units, got {}",
parse_result.units.len()
);
let analyzer = SemanticAnalyzer::new();
let graph = analyzer
.analyze(parse_result.units, &AnalyzeOptions::default())
.unwrap();
assert!(graph.unit_count() >= 5, "Expected >= 5 units in graph");
let acb_path = tmp.path().join("project.acb");
let writer = AcbWriter::with_default_dimension();
writer.write_to_file(&graph, &acb_path).unwrap();
assert!(acb_path.exists());
let loaded = AcbReader::read_from_file(&acb_path).unwrap();
assert_eq!(loaded.unit_count(), graph.unit_count());
assert_eq!(loaded.edge_count(), graph.edge_count());
let engine = QueryEngine::new();
let results = engine
.symbol_lookup(
&loaded,
SymbolLookupParams {
name: "User".into(),
mode: MatchMode::Contains,
..Default::default()
},
)
.unwrap();
assert!(!results.is_empty(), "Should find units containing 'User'");
let sym_idx = SymbolIndex::build(&loaded);
assert!(!sym_idx.is_empty());
let lang_idx = LanguageIndex::build(&loaded);
assert!(!lang_idx.languages().is_empty());
for (i, unit) in loaded.units().iter().enumerate() {
let orig = &graph.units()[i];
assert_eq!(unit.name, orig.name, "Name mismatch at unit {}", i);
assert_eq!(
unit.unit_type, orig.unit_type,
"Type mismatch at unit {}",
i
);
assert_eq!(
unit.language, orig.language,
"Language mismatch at unit {}",
i
);
}
}
#[test]
fn test_large_graph() {
let graph = build_large_graph(1000);
assert_eq!(graph.unit_count(), 1000);
assert!(graph.edge_count() > 900, "Should have many edges");
let tmp = tempfile::tempdir().unwrap();
let acb_path = tmp.path().join("large.acb");
let writer = AcbWriter::with_default_dimension();
writer.write_to_file(&graph, &acb_path).unwrap();
let loaded = AcbReader::read_from_file(&acb_path).unwrap();
assert_eq!(loaded.unit_count(), 1000);
assert_eq!(loaded.edge_count(), graph.edge_count());
let sym_idx = SymbolIndex::build(&loaded);
assert_eq!(sym_idx.len(), 1000);
let type_idx = TypeIndex::build(&loaded);
let functions = type_idx.lookup(CodeUnitType::Function);
assert_eq!(functions.len(), 999);
let lang_idx = LanguageIndex::build(&loaded);
assert!(lang_idx.count(Language::Python) > 0);
assert!(lang_idx.count(Language::Rust) > 0);
assert!(lang_idx.count(Language::TypeScript) > 0);
let path_idx = PathIndex::build(&loaded);
assert!(path_idx.file_count() > 0);
let engine = QueryEngine::new();
let results = engine
.symbol_lookup(
&loaded,
SymbolLookupParams {
name: "function_500".into(),
mode: MatchMode::Exact,
..Default::default()
},
)
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].name, "function_500");
let prefix_results = engine
.symbol_lookup(
&loaded,
SymbolLookupParams {
name: "function_10".into(),
mode: MatchMode::Prefix,
limit: 50,
..Default::default()
},
)
.unwrap();
assert!(
prefix_results.len() >= 2,
"Should find multiple prefix matches"
);
let deps = engine.dependency_graph(
&loaded,
DependencyParams {
unit_id: 500,
max_depth: 3,
edge_types: vec![],
include_transitive: true,
},
);
assert!(deps.is_ok());
let impact = engine.impact_analysis(
&loaded,
ImpactParams {
unit_id: 500,
max_depth: 3,
edge_types: vec![],
},
);
assert!(impact.is_ok());
}
#[test]
fn test_cross_language_complete() {
let tmp = tempfile::tempdir().unwrap();
let src_dir = tmp.path().join("multi_lang");
std::fs::create_dir_all(&src_dir).unwrap();
create_multi_language_fixture(&src_dir);
let parser = Parser::new();
let opts = ParseOptions::default();
let parse_result = parser.parse_directory(&src_dir, &opts).unwrap();
let has_python = parse_result
.stats
.by_language
.contains_key(&Language::Python);
let has_rust = parse_result.stats.by_language.contains_key(&Language::Rust);
let has_ts = parse_result
.stats
.by_language
.contains_key(&Language::TypeScript);
let lang_count = [has_python, has_rust, has_ts]
.iter()
.filter(|&&b| b)
.count();
assert!(
lang_count >= 2,
"Expected at least 2 languages, found {}: py={}, rs={}, ts={}",
lang_count,
has_python,
has_rust,
has_ts
);
let analyzer = SemanticAnalyzer::new();
let graph = analyzer
.analyze(parse_result.units, &AnalyzeOptions::default())
.unwrap();
let languages: std::collections::HashSet<Language> =
graph.units().iter().map(|u| u.language).collect();
assert!(
languages.len() >= 2,
"Expected units from at least 2 languages, got {:?}",
languages
);
let acb_path = tmp.path().join("multi.acb");
let writer = AcbWriter::with_default_dimension();
writer.write_to_file(&graph, &acb_path).unwrap();
let loaded = AcbReader::read_from_file(&acb_path).unwrap();
let loaded_languages: std::collections::HashSet<Language> =
loaded.units().iter().map(|u| u.language).collect();
assert_eq!(
loaded_languages, languages,
"Languages should survive roundtrip"
);
let engine = QueryEngine::new();
let results = engine
.symbol_lookup(
&loaded,
SymbolLookupParams {
name: "User".into(),
mode: MatchMode::Contains,
..Default::default()
},
)
.unwrap();
assert!(!results.is_empty(), "Should find User symbols");
}
#[test]
fn test_mcp_full_workflow() {
let graph = GraphBuilder::new(DEFAULT_DIMENSION)
.add_unit(CodeUnit::new(
CodeUnitType::Module,
Language::Python,
"payments".into(),
"payments".into(),
"payments/__init__.py".into(),
Span::new(1, 0, 10, 0),
))
.add_unit(CodeUnit::new(
CodeUnitType::Function,
Language::Python,
"process_payment".into(),
"payments.stripe.process_payment".into(),
"payments/stripe.py".into(),
Span::new(1, 0, 40, 0),
))
.add_unit(CodeUnit::new(
CodeUnitType::Function,
Language::Python,
"validate_card".into(),
"payments.validation.validate_card".into(),
"payments/validation.py".into(),
Span::new(1, 0, 20, 0),
))
.add_unit(CodeUnit::new(
CodeUnitType::Test,
Language::Python,
"test_payment".into(),
"tests.test_payments.test_payment".into(),
"tests/test_payments.py".into(),
Span::new(1, 0, 15, 0),
))
.add_edge(Edge::new(1, 2, EdgeType::Calls))
.add_edge(Edge::new(3, 1, EdgeType::Tests))
.build()
.unwrap();
let mut server = McpServer::new();
let init_response = server.handle_raw(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}}}"#);
let init: serde_json::Value = serde_json::from_str(&init_response).unwrap();
assert!(init["result"].is_object());
assert_eq!(init["result"]["protocolVersion"], "2024-11-05");
server.load_graph("payments".into(), graph);
assert!(server.get_graph("payments").is_some());
let tools_response =
server.handle_raw(r#"{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}"#);
let tools: serde_json::Value = serde_json::from_str(&tools_response).unwrap();
assert!(tools["result"]["tools"].is_array());
let lookup_response = server.handle_raw(r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"symbol_lookup","arguments":{"graph":"payments","name":"process_payment","mode":"exact"}}}"#);
let lookup: serde_json::Value = serde_json::from_str(&lookup_response).unwrap();
assert!(
lookup["result"].is_object() || lookup["result"].is_array(),
"symbol_lookup should return results: {}",
lookup
);
let impact_response = server.handle_raw(r#"{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"impact_analysis","arguments":{"graph":"payments","unit_id":1,"depth":3}}}"#);
let impact: serde_json::Value = serde_json::from_str(&impact_response).unwrap();
assert!(
!impact["error"].is_object(),
"impact_analysis should not error: {}",
impact
);
let stats_response = server.handle_raw(r#"{"jsonrpc":"2.0","id":5,"method":"resources/read","params":{"uri":"acb://graphs/payments/stats"}}"#);
let stats: serde_json::Value = serde_json::from_str(&stats_response).unwrap();
assert!(
!stats["error"].is_object(),
"graph stats should not error: {}",
stats
);
let invalid =
server.handle_raw(r#"{"jsonrpc":"2.0","id":7,"method":"nonexistent/method","params":{}}"#);
let inv: serde_json::Value = serde_json::from_str(&invalid).unwrap();
assert!(inv["error"].is_object(), "Should error on invalid method");
let removed = server.unload_graph("payments");
assert!(removed.is_some());
assert!(server.get_graph("payments").is_none());
let shutdown = server.handle_raw(r#"{"jsonrpc":"2.0","id":8,"method":"shutdown","params":{}}"#);
let shut: serde_json::Value = serde_json::from_str(&shutdown).unwrap();
assert!(!shut["error"].is_object());
}
#[test]
fn test_index_consistency_with_graph() {
let graph = build_large_graph(500);
let sym_idx = SymbolIndex::build(&graph);
let type_idx = TypeIndex::build(&graph);
let path_idx = PathIndex::build(&graph);
let lang_idx = LanguageIndex::build(&graph);
for unit in graph.units() {
let exact = sym_idx.lookup_exact(&unit.name);
assert!(
!exact.is_empty(),
"Unit '{}' not found in symbol index",
unit.name
);
}
let mut type_sum = 0;
for ut in [
CodeUnitType::Module,
CodeUnitType::Function,
CodeUnitType::Type,
CodeUnitType::Trait,
CodeUnitType::Test,
CodeUnitType::Symbol,
CodeUnitType::Parameter,
CodeUnitType::Import,
CodeUnitType::Doc,
CodeUnitType::Config,
CodeUnitType::Pattern,
CodeUnitType::Impl,
CodeUnitType::Macro,
] {
type_sum += type_idx.count(ut);
}
assert_eq!(
type_sum,
graph.unit_count(),
"Type index total doesn't match graph"
);
let mut lang_sum = 0;
for lang in lang_idx.languages() {
lang_sum += lang_idx.count(lang);
}
assert_eq!(
lang_sum,
graph.unit_count(),
"Language index total doesn't match graph"
);
for unit in graph.units() {
let found = path_idx.lookup(&unit.file_path);
assert!(
!found.is_empty(),
"File '{}' not found in path index",
unit.file_path.display()
);
}
}
#[test]
fn test_query_on_parsed_code() {
let tmp = tempfile::tempdir().unwrap();
let src_dir = tmp.path().join("query_project");
std::fs::create_dir_all(&src_dir).unwrap();
std::fs::write(
src_dir.join("main.py"),
r#"
class Database:
def connect(self):
pass
def query(self, sql: str):
pass
class UserRepository:
def __init__(self, db: Database):
self.db = db
def find_by_id(self, user_id: int):
return self.db.query("SELECT * FROM users WHERE id = ?")
def find_all(self):
return self.db.query("SELECT * FROM users")
def main():
db = Database()
repo = UserRepository(db)
user = repo.find_by_id(1)
print(user)
"#,
)
.unwrap();
let parser = Parser::new();
let opts = ParseOptions::default();
let parse_result = parser.parse_directory(&src_dir, &opts).unwrap();
assert!(parse_result.units.len() >= 3);
let analyzer = SemanticAnalyzer::new();
let graph = analyzer
.analyze(parse_result.units, &AnalyzeOptions::default())
.unwrap();
let engine = QueryEngine::new();
let results = engine
.symbol_lookup(
&graph,
SymbolLookupParams {
name: "Database".into(),
mode: MatchMode::Exact,
..Default::default()
},
)
.unwrap();
assert!(!results.is_empty(), "Should find Database class");
let repo_results = engine
.symbol_lookup(
&graph,
SymbolLookupParams {
name: "find".into(),
mode: MatchMode::Contains,
..Default::default()
},
)
.unwrap();
assert!(
repo_results.len() >= 2,
"Should find find_by_id and find_all, got {}",
repo_results.len()
);
}
#[test]
fn test_embedding_index_large() {
let mut graph = build_large_graph(200);
let dim = graph.dimension();
for i in 0..graph.unit_count() {
let mut vec = vec![0.0f32; dim];
for (d, v) in vec.iter_mut().enumerate().take(dim) {
*v = ((i * 7 + d * 3) as f32 % 100.0) / 100.0;
}
if let Some(unit) = graph.get_unit_mut(i as u64) {
unit.feature_vec = vec;
}
}
let emb_idx = EmbeddingIndex::build(&graph);
assert_eq!(emb_idx.len(), 200);
let mut query = vec![0.5f32; dim];
query[0] = 1.0;
let results = emb_idx.search(&query, 10, 0.0);
assert!(results.len() <= 10);
for w in results.windows(2) {
assert!(
w[0].score >= w[1].score,
"Results not sorted: {} >= {}",
w[0].score,
w[1].score
);
}
}
#[test]
fn test_cli_compile_and_info_roundtrip() {
let tmp = tempfile::tempdir().unwrap();
let src_dir = tmp.path().join("cli_project");
std::fs::create_dir_all(&src_dir).unwrap();
std::fs::write(
src_dir.join("simple.py"),
"def hello():\n return 'world'\n",
)
.unwrap();
let acb_path = tmp.path().join("output.acb");
let bin = env!("CARGO_BIN_EXE_acb");
let output = std::process::Command::new(bin)
.args(["compile", src_dir.to_str().unwrap()])
.arg("--output")
.arg(acb_path.to_str().unwrap())
.output()
.unwrap();
assert!(
output.status.success(),
"Compile failed: {}",
String::from_utf8_lossy(&output.stderr)
);
let info_output = std::process::Command::new(bin)
.args(["info", acb_path.to_str().unwrap()])
.output()
.unwrap();
assert!(info_output.status.success());
let info_str = String::from_utf8_lossy(&info_output.stdout);
assert!(info_str.contains("Units:"), "Info should show units count");
let query_output = std::process::Command::new(bin)
.args([
"query",
acb_path.to_str().unwrap(),
"symbol",
"--name",
"hello",
])
.output()
.unwrap();
assert!(query_output.status.success());
let query_str = String::from_utf8_lossy(&query_output.stdout);
assert!(
query_str.contains("hello"),
"Query should find 'hello' function"
);
let json_output = std::process::Command::new(bin)
.args(["info", acb_path.to_str().unwrap(), "--format", "json"])
.output()
.unwrap();
assert!(json_output.status.success());
let json: Result<serde_json::Value, _> = serde_json::from_slice(&json_output.stdout);
assert!(json.is_ok(), "JSON output should be valid JSON");
}
#[test]
fn test_temporal_collective_integration() {
use agentic_codebase::collective::CollectiveManager;
use agentic_codebase::temporal::history::ChangeHistory;
use agentic_codebase::temporal::stability::StabilityAnalyzer;
let mut history = ChangeHistory::new();
for i in 0..50u64 {
let path = format!("src/module_{}.py", i % 50);
history.add_change(agentic_codebase::temporal::history::FileChange {
path: path.into(),
commit_id: format!("abc{:04}", i),
author: "dev@example.com".into(),
timestamp: 1700000000 + i * 3600,
change_type: agentic_codebase::temporal::history::ChangeType::Modify,
lines_added: 10,
lines_deleted: 5,
is_bugfix: i % 5 == 0,
old_path: None,
});
}
let analyzer = StabilityAnalyzer::new();
let stability = analyzer.calculate_stability(Path::new("src/module_0.py"), &history);
assert!(stability.overall_score >= 0.0 && stability.overall_score <= 1.0);
let manager = CollectiveManager::offline();
assert!(manager.is_offline());
}
#[test]
fn test_concurrent_read_operations() {
use std::sync::Arc;
let graph = Arc::new(build_large_graph(500));
let engine = Arc::new(QueryEngine::new());
let mut handles = vec![];
for t in 0..4 {
let g = graph.clone();
let e = engine.clone();
handles.push(std::thread::spawn(move || {
for i in 0..50 {
let idx = (t * 50 + i) % 499 + 1;
let name = format!("function_{}", idx);
let results = e
.symbol_lookup(
&g,
SymbolLookupParams {
name,
mode: MatchMode::Exact,
..Default::default()
},
)
.unwrap();
assert!(!results.is_empty());
}
}));
}
for h in handles {
h.join().unwrap();
}
}