use std::sync::{Arc, Mutex, OnceLock};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Mutex as TokioMutex;
use tracing_subscriber::filter::LevelFilter;
fn env_var_lock() -> std::sync::MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
let m = LOCK.get_or_init(|| Mutex::new(()));
m.lock().unwrap_or_else(|e| e.into_inner())
}
fn make_test_analyzer() -> aptu_coder::CodeAnalyzer {
let peer = Arc::new(TokioMutex::new(None));
let log_level_filter = Arc::new(Mutex::new(LevelFilter::INFO));
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<aptu_coder::logging::LogEvent>();
let (metrics_tx, _metrics_rx) = tokio::sync::mpsc::unbounded_channel();
aptu_coder::CodeAnalyzer::new(
peer,
log_level_filter,
rx,
aptu_coder::metrics::MetricsSender(metrics_tx),
)
}
async fn call_tools_list_with_profile(profile: Option<&str>) -> serde_json::Value {
let analyzer = make_test_analyzer();
let (client, server) = tokio::io::duplex(65536);
let mut server_handle = tokio::spawn(async move {
let (server_rx, server_tx) = tokio::io::split(server);
if let Ok(service) = rmcp::serve_server(analyzer, (server_rx, server_tx)).await {
let _ = service.waiting().await;
}
});
let (client_rx, mut client_tx) = tokio::io::split(client);
let mut reader = BufReader::new(client_rx).lines();
let mut init_params = serde_json::json!({
"protocolVersion": "2025-11-25",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "0.1.0"}
});
if let Some(p) = profile {
init_params["_meta"] = serde_json::json!({
"io.clouatre-labs/profile": p
});
}
let init = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": init_params
})
.to_string()
+ "\n";
client_tx.write_all(init.as_bytes()).await.unwrap();
client_tx.flush().await.unwrap();
let _resp = reader.next_line().await.unwrap().unwrap();
let notif = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
})
.to_string()
+ "\n";
client_tx.write_all(notif.as_bytes()).await.unwrap();
client_tx.flush().await.unwrap();
let list_req = serde_json::json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
})
.to_string()
+ "\n";
client_tx.write_all(list_req.as_bytes()).await.unwrap();
client_tx.flush().await.unwrap();
tokio::select! {
result = async {
loop {
let line = reader.next_line().await.unwrap().unwrap();
let v: serde_json::Value = serde_json::from_str(&line).unwrap();
if v.get("id") == Some(&serde_json::json!(2)) {
return v;
}
}
} => {
server_handle.abort();
result
}
outcome = &mut server_handle => {
match outcome {
Ok(_) => panic!("server task exited unexpectedly before tools/list response"),
Err(e) => panic!("server task panicked: {e}"),
}
}
}
}
async fn call_tool_with_profile(profile: Option<&str>, tool_name: &str) -> serde_json::Value {
let analyzer = make_test_analyzer();
let (client, server) = tokio::io::duplex(65536);
let mut server_handle = tokio::spawn(async move {
let (server_rx, server_tx) = tokio::io::split(server);
if let Ok(service) = rmcp::serve_server(analyzer, (server_rx, server_tx)).await {
let _ = service.waiting().await;
}
});
let (client_rx, mut client_tx) = tokio::io::split(client);
let mut reader = BufReader::new(client_rx).lines();
let mut init_params = serde_json::json!({
"protocolVersion": "2025-11-25",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "0.1.0"}
});
if let Some(p) = profile {
init_params["_meta"] = serde_json::json!({
"io.clouatre-labs/profile": p
});
}
let init = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": init_params
})
.to_string()
+ "\n";
client_tx.write_all(init.as_bytes()).await.unwrap();
client_tx.flush().await.unwrap();
let _resp = reader.next_line().await.unwrap().unwrap();
let notif = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
})
.to_string()
+ "\n";
client_tx.write_all(notif.as_bytes()).await.unwrap();
client_tx.flush().await.unwrap();
let call = serde_json::json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": {}
}
})
.to_string()
+ "\n";
client_tx.write_all(call.as_bytes()).await.unwrap();
client_tx.flush().await.unwrap();
tokio::select! {
result = async {
loop {
let line = reader.next_line().await.unwrap().unwrap();
let v: serde_json::Value = serde_json::from_str(&line).unwrap();
if v.get("id") == Some(&serde_json::json!(2)) {
return v;
}
}
} => {
server_handle.abort();
result
}
outcome = &mut server_handle => {
match outcome {
Ok(_) => panic!("server task exited unexpectedly before tool response"),
Err(e) => panic!("server task panicked: {e}"),
}
}
}
}
#[tokio::test]
async fn test_edit_profile_tool_count() {
let _guard = env_var_lock();
let resp = call_tools_list_with_profile(Some("edit")).await;
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
let tool_names: Vec<String> = tools
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|t| t["name"].as_str().map(|s| s.to_string()))
.collect();
assert_eq!(
tool_count, 3,
"edit profile should enable exactly 3 tools, got: {:?}",
tool_names
);
assert!(
tool_names.contains(&"edit_replace".to_string()),
"edit profile should include edit_replace"
);
assert!(
tool_names.contains(&"edit_overwrite".to_string()),
"edit profile should include edit_overwrite"
);
assert!(
tool_names.contains(&"exec_command".to_string()),
"edit profile should include exec_command"
);
}
#[tokio::test]
async fn test_analyze_profile_tool_count() {
let _guard = env_var_lock();
let resp = call_tools_list_with_profile(Some("analyze")).await;
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
let tool_names: Vec<String> = tools
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|t| t["name"].as_str().map(|s| s.to_string()))
.collect();
assert_eq!(
tool_count, 5,
"analyze profile should enable exactly 5 tools, got: {:?}",
tool_names
);
assert!(
tool_names.contains(&"analyze_directory".to_string()),
"analyze profile should include analyze_directory"
);
assert!(
tool_names.contains(&"analyze_file".to_string()),
"analyze profile should include analyze_file"
);
assert!(
tool_names.contains(&"analyze_module".to_string()),
"analyze profile should include analyze_module"
);
assert!(
tool_names.contains(&"analyze_symbol".to_string()),
"analyze profile should include analyze_symbol"
);
assert!(
tool_names.contains(&"exec_command".to_string()),
"analyze profile should include exec_command"
);
}
#[tokio::test]
async fn test_remote_profile_tool_count() {
let _guard = env_var_lock();
let resp = call_tools_list_with_profile(Some("remote")).await;
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
let tool_names: Vec<String> = tools
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|t| t["name"].as_str().map(|s| s.to_string()))
.collect();
assert_eq!(
tool_count, 9,
"remote profile should enable all 9 tools, got: {:?}",
tool_names
);
assert!(
tool_names.contains(&"remote_tree".to_string()),
"remote profile should include remote_tree"
);
assert!(
tool_names.contains(&"remote_file".to_string()),
"remote profile should include remote_file"
);
}
#[tokio::test]
async fn test_no_profile_tool_count() {
let _guard = env_var_lock();
unsafe {
std::env::remove_var("APTU_CODER_PROFILE");
}
let resp = call_tools_list_with_profile(None).await;
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
assert_eq!(
tool_count, 7,
"no profile should enable 7 tools (remote_tree and remote_file disabled), got: {}",
tool_count
);
}
#[tokio::test]
async fn test_unknown_profile_tool_count() {
let _guard = env_var_lock();
unsafe {
std::env::remove_var("APTU_CODER_PROFILE");
}
let resp = call_tools_list_with_profile(Some("unknown_profile")).await;
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
assert_eq!(
tool_count, 7,
"unknown profile should enable 7 tools (remote_tree and remote_file disabled), got: {}",
tool_count
);
}
#[tokio::test]
async fn test_disabled_tool_returns_invalid_params() {
let _guard = env_var_lock();
let resp = call_tool_with_profile(Some("edit"), "analyze_directory").await;
let error_code = resp["error"]["code"].as_i64();
assert_eq!(
error_code,
Some(-32602),
"calling a disabled tool should return INVALID_PARAMS (-32602), got: {:?}",
resp
);
}
#[tokio::test]
async fn test_profile_env_var_fallback() {
let _guard = env_var_lock();
unsafe {
std::env::set_var("APTU_CODER_PROFILE", "edit");
}
let resp = call_tools_list_with_profile(None).await;
unsafe {
std::env::remove_var("APTU_CODER_PROFILE");
}
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
let tool_names: Vec<String> = tools
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|t| t["name"].as_str().map(|s| s.to_string()))
.collect();
assert_eq!(
tool_count, 3,
"env var profile (edit) should enable exactly 3 tools, got: {:?}",
tool_names
);
assert!(
tool_names.contains(&"edit_replace".to_string()),
"env var profile should include edit_replace"
);
}
#[tokio::test]
async fn test_profile_env_var_ignored_when_meta_present() {
let _guard = env_var_lock();
unsafe {
std::env::set_var("APTU_CODER_PROFILE", "edit");
}
let resp = call_tools_list_with_profile(Some("analyze")).await;
unsafe {
std::env::remove_var("APTU_CODER_PROFILE");
}
let tools = &resp["result"]["tools"];
let tool_count = tools.as_array().map(|a| a.len()).unwrap_or(0);
let tool_names: Vec<String> = tools
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|t| t["name"].as_str().map(|s| s.to_string()))
.collect();
assert_eq!(
tool_count, 5,
"_meta profile (analyze) should take precedence over env var, got: {:?}",
tool_names
);
assert!(
tool_names.contains(&"analyze_directory".to_string()),
"_meta profile should include analyze_directory"
);
}