use super::LspRenameRequest;
use super::{
LspCodeActionRequest, LspDiagnosticRequest, LspRenamePlanRequest, LspRequest, LspSessionPool,
LspTypeHierarchyRequest, LspWorkspaceSymbolRequest, code_action_refactor_via_lsp,
default_lsp_args_for_command, default_lsp_command_for_path, find_referencing_symbols_via_lsp,
get_diagnostics_via_lsp, get_rename_plan_via_lsp, get_type_hierarchy_via_lsp,
rename_symbol_via_lsp, search_workspace_symbols_via_lsp,
};
use crate::ProjectRoot;
use serde_json::Value;
use std::fs;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
#[test]
fn reads_references_from_mock_lsp() {
let dir = temp_dir("codelens-lsp-test");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "def greet():\n return 1\n").expect("write sample");
let server_path = dir.join("mock_lsp.py");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let refs = find_referencing_symbols_via_lsp(
&project,
&LspRequest {
command: "python3".to_owned(),
args: vec![
server_path.display().to_string(),
dir.join("count.txt").display().to_string(),
],
file_path: "sample.py".to_owned(),
line: 1,
column: 5,
max_results: 10,
},
)
.expect("lsp references");
assert_eq!(refs.len(), 1);
assert_eq!(refs[0].file_path, "sample.py");
assert_eq!(refs[0].line, 1);
assert_eq!(refs[0].column, 5);
}
#[test]
fn reuses_pooled_session() {
let dir = temp_dir("codelens-lsp-pool");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "def greet():\n return 1\n").expect("write sample");
let server_path = dir.join("mock_lsp.py");
let count_path = dir.join("count.txt");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let pool = LspSessionPool::new(project.clone());
let request = LspRequest {
command: "python3".to_owned(),
args: vec![
server_path.display().to_string(),
count_path.display().to_string(),
],
file_path: "sample.py".to_owned(),
line: 1,
column: 5,
max_results: 10,
};
let refs1 = pool.find_referencing_symbols(&request).expect("refs1");
let refs2 = pool.find_referencing_symbols(&request).expect("refs2");
assert_eq!(refs1.len(), 1);
assert_eq!(refs2.len(), 1);
assert_eq!(pool.session_count(), 1);
drop(pool);
let initialize_count = fs::read_to_string(&count_path)
.expect("count file")
.trim()
.parse::<usize>()
.expect("count");
assert_eq!(initialize_count, 1);
}
#[test]
fn reads_diagnostics_from_mock_lsp() {
let dir = temp_dir("codelens-lsp-diagnostics");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "def greet(:\n return 1\n").expect("write sample");
let server_path = dir.join("mock_lsp.py");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let diagnostics = get_diagnostics_via_lsp(
&project,
&LspDiagnosticRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.py".to_owned(),
max_results: 10,
},
)
.expect("lsp diagnostics");
assert_eq!(diagnostics.len(), 1);
assert_eq!(diagnostics[0].file_path, "sample.py");
assert_eq!(diagnostics[0].severity_label.as_deref(), Some("error"));
assert!(diagnostics[0].message.contains("syntax"));
}
#[test]
fn reads_workspace_symbols_from_mock_lsp() {
let dir = temp_dir("codelens-lsp-workspace-symbols");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "class Service:\n pass\n").expect("write sample");
let server_path = dir.join("mock_lsp.py");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let symbols = search_workspace_symbols_via_lsp(
&project,
&LspWorkspaceSymbolRequest {
command: "python3".to_owned(),
args: vec![
server_path.display().to_string(),
dir.join("sample.py").display().to_string(),
],
query: "Service".to_owned(),
max_results: 10,
},
)
.expect("workspace symbols");
assert_eq!(symbols.len(), 1);
assert_eq!(symbols[0].name, "Service");
assert_eq!(symbols[0].kind_label.as_deref(), Some("class"));
assert_eq!(symbols[0].file_path, "sample.py");
}
#[test]
fn reads_type_hierarchy_from_mock_lsp() {
let dir = temp_dir("codelens-lsp-type-hierarchy");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "class Service:\n pass\n").expect("write sample");
let server_path = dir.join("mock_lsp.py");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let hierarchy = get_type_hierarchy_via_lsp(
&project,
&LspTypeHierarchyRequest {
command: "python3".to_owned(),
args: vec![
server_path.display().to_string(),
dir.join("sample.py").display().to_string(),
],
query: "Service".to_owned(),
relative_path: Some("sample.py".to_owned()),
hierarchy_type: "both".to_owned(),
depth: 1,
},
)
.expect("type hierarchy");
assert_eq!(
hierarchy.get("class_name"),
Some(&Value::String("Service".to_owned()))
);
assert_eq!(
hierarchy.get("fully_qualified_name"),
Some(&Value::String("sample.Service".to_owned()))
);
assert!(
hierarchy
.get("supertypes")
.and_then(Value::as_array)
.is_some_and(|items: &Vec<Value>| !items.is_empty())
);
assert!(
hierarchy
.get("subtypes")
.and_then(Value::as_array)
.is_some_and(|items: &Vec<Value>| !items.is_empty())
);
}
#[test]
fn reads_rename_plan_from_mock_lsp() {
let dir = temp_dir("codelens-lsp-rename-plan");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "class Service:\n pass\n").expect("write sample");
let server_path = dir.join("mock_lsp.py");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let plan = get_rename_plan_via_lsp(
&project,
&LspRenamePlanRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.py".to_owned(),
line: 1,
column: 8,
new_name: Some("RenamedService".to_owned()),
},
)
.expect("rename plan");
assert_eq!(plan.file_path, "sample.py");
assert_eq!(plan.current_name, "Service");
assert_eq!(plan.placeholder.as_deref(), Some("Service"));
assert_eq!(plan.new_name.as_deref(), Some("RenamedService"));
}
#[test]
fn applies_rename_workspace_edit_from_mock_lsp() {
let dir = temp_dir("codelens-lsp-rename-apply");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(
dir.join("sample.py"),
"class Service:\n pass\n\nService()\n",
)
.expect("write sample");
let server_path = dir.join("mock_lsp.py");
fs::write(&server_path, mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let result = rename_symbol_via_lsp(
&project,
&LspRenameRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.py".to_owned(),
line: 1,
column: 8,
new_name: "RenamedService".to_owned(),
dry_run: false,
},
)
.expect("rename result");
assert_eq!(result.total_replacements, 2);
assert_eq!(result.modified_files, 1);
let updated = fs::read_to_string(dir.join("sample.py")).expect("read updated sample");
assert!(updated.contains("class RenamedService:"));
assert!(updated.contains("RenamedService()"));
}
#[test]
fn lsp_requests_translate_byte_columns_to_utf16_positions() {
let dir = temp_dir("codelens-lsp-utf16-request");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(dir.join("sample.py"), "🙂 greet()\n").expect("write sample");
let server_path = dir.join("mock_utf16_lsp.py");
fs::write(&server_path, utf16_position_mock_server_script()).expect("write mock server");
chmod_exec(&server_path);
let refs = find_referencing_symbols_via_lsp(
&project,
&LspRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.py".to_owned(),
line: 1,
column: "🙂 ".len() + 1,
max_results: 10,
},
)
.expect("lsp references");
assert_eq!(
refs.len(),
1,
"server only returns a reference for UTF-16 character 3"
);
assert_eq!(refs[0].file_path, "sample.py");
assert_eq!(refs[0].column, "🙂 ".len() + 1);
}
#[test]
fn rename_via_lsp_runs_prepare_rename_before_workspace_edit() {
let dir = temp_dir("codelens-lsp-rename-prepare-first");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(
dir.join("sample.py"),
"class Service:\n pass\n\nService()\n",
)
.expect("write sample");
let server_path = dir.join("mock_prepare_required_lsp.py");
fs::write(&server_path, prepare_required_rename_mock_server_script())
.expect("write mock server");
chmod_exec(&server_path);
let result = rename_symbol_via_lsp(
&project,
&LspRenameRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.py".to_owned(),
line: 1,
column: 7,
new_name: "RenamedService".to_owned(),
dry_run: true,
},
)
.expect("rename result");
assert_eq!(result.total_replacements, 2);
}
#[test]
fn applies_lsp_code_action_workspace_edit() {
let dir = temp_dir("codelens-lsp-code-action-apply");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(
dir.join("sample.ts"),
"function main() {\n const value = 1;\n console.log(value);\n}\n",
)
.expect("write sample");
let server_path = dir.join("mock_code_action_lsp.py");
fs::write(&server_path, code_action_mock_server_script(false)).expect("write mock server");
chmod_exec(&server_path);
let result = code_action_refactor_via_lsp(
&project,
&LspCodeActionRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.ts".to_owned(),
start_line: 2,
start_column: 3,
end_line: 2,
end_column: " const value = 1;".len() + 1,
only: vec!["refactor.extract".to_owned()],
action_id: None,
operation: "extract_function".to_owned(),
dry_run: false,
},
)
.expect("code action refactor");
assert_eq!(result.transaction.edit_count, 2);
assert_eq!(result.action_kind.as_deref(), Some("refactor.extract"));
let updated = fs::read_to_string(dir.join("sample.ts")).expect("read updated");
assert!(updated.contains("extracted();"));
assert!(updated.contains("function extracted()"));
}
#[test]
fn resolves_lsp_code_action_before_workspace_edit_apply() {
let dir = temp_dir("codelens-lsp-code-action-resolve");
let project = ProjectRoot::new(&dir).expect("project");
fs::write(
dir.join("sample.ts"),
"function main() {\n const value = 1;\n}\n",
)
.expect("write sample");
let server_path = dir.join("mock_code_action_resolve_lsp.py");
fs::write(&server_path, code_action_mock_server_script(true)).expect("write mock server");
chmod_exec(&server_path);
let result = code_action_refactor_via_lsp(
&project,
&LspCodeActionRequest {
command: "python3".to_owned(),
args: vec![server_path.display().to_string()],
file_path: "sample.ts".to_owned(),
start_line: 2,
start_column: 3,
end_line: 2,
end_column: " const value = 1;".len() + 1,
only: vec!["refactor.extract".to_owned()],
action_id: None,
operation: "extract_function".to_owned(),
dry_run: false,
},
)
.expect("resolved code action refactor");
assert_eq!(result.resolved_via, "codeAction/resolve");
let updated = fs::read_to_string(dir.join("sample.ts")).expect("read updated");
assert!(updated.contains("extracted();"));
}
#[test]
fn default_lsp_command_is_derived_from_registry_by_path() {
assert_eq!(
default_lsp_command_for_path("src/main.py"),
Some("pyright-langserver")
);
assert_eq!(default_lsp_command_for_path("src/Build.SC"), Some("metals"));
assert_eq!(
default_lsp_command_for_path("src/native/foo.hpp"),
Some("clangd")
);
}
#[test]
fn default_lsp_args_are_derived_from_registry_by_command() {
assert_eq!(
default_lsp_args_for_command("clangd"),
Some(&["--background-index"][..])
);
assert_eq!(
default_lsp_args_for_command("typescript-language-server"),
Some(&["--stdio"][..])
);
assert_eq!(default_lsp_args_for_command("metals"), Some(&[][..]));
}
fn chmod_exec(_path: &std::path::Path) {
#[cfg(unix)]
{
let mut perms = fs::metadata(_path).expect("metadata").permissions();
perms.set_mode(0o755);
fs::set_permissions(_path, perms).expect("chmod");
}
}
fn temp_dir(prefix: &str) -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!(
"{prefix}-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
));
fs::create_dir_all(&dir).expect("create dir");
dir
}
fn utf16_position_mock_server_script() -> &'static str {
r#"#!/usr/bin/env python3
import json
import sys
def read_message():
headers = {}
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
if line in (b"\r\n", b"\n"):
break
name, value = line.decode("utf-8").split(":", 1)
headers[name.strip().lower()] = value.strip()
body = sys.stdin.buffer.read(int(headers["content-length"]))
return json.loads(body.decode("utf-8"))
def send(payload):
body = json.dumps(payload).encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8"))
sys.stdout.buffer.write(body)
sys.stdout.buffer.flush()
while True:
message = read_message()
if message is None:
break
method = message.get("method")
if method == "initialize":
send({"jsonrpc":"2.0","id":message["id"],"result":{"capabilities":{"referencesProvider": True}}})
elif method == "textDocument/references":
uri = message["params"]["textDocument"]["uri"]
character = message["params"]["position"]["character"]
result = []
if character == 3:
result = [{
"uri": uri,
"range": {
"start": {"line": 0, "character": 3},
"end": {"line": 0, "character": 8}
}
}]
send({"jsonrpc":"2.0","id":message["id"],"result":result})
elif method == "shutdown":
send({"jsonrpc":"2.0","id":message["id"],"result":None})
elif method == "exit":
break
"#
}
fn prepare_required_rename_mock_server_script() -> &'static str {
r#"#!/usr/bin/env python3
import json
import sys
prepared = False
def read_message():
headers = {}
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
if line in (b"\r\n", b"\n"):
break
name, value = line.decode("utf-8").split(":", 1)
headers[name.strip().lower()] = value.strip()
body = sys.stdin.buffer.read(int(headers["content-length"]))
return json.loads(body.decode("utf-8"))
def send(payload):
body = json.dumps(payload).encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8"))
sys.stdout.buffer.write(body)
sys.stdout.buffer.flush()
while True:
message = read_message()
if message is None:
break
method = message.get("method")
if method == "initialize":
send({"jsonrpc":"2.0","id":message["id"],"result":{"capabilities":{"renameProvider":{"prepareProvider": True}}}})
elif method == "textDocument/prepareRename":
prepared = True
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":{
"range":{
"start":{"line":0,"character":6},
"end":{"line":0,"character":13}
},
"placeholder":"Service"
}
})
elif method == "textDocument/rename":
if not prepared:
send({"jsonrpc":"2.0","id":message["id"],"error":{"code":-32000,"message":"prepareRename required"}})
continue
uri = message["params"]["textDocument"]["uri"]
new_name = message["params"]["newName"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":{
"changes": {
uri: [
{
"range":{
"start":{"line":0,"character":6},
"end":{"line":0,"character":13}
},
"newText": new_name
},
{
"range":{
"start":{"line":3,"character":0},
"end":{"line":3,"character":7}
},
"newText": new_name
}
]
}
}
})
elif method == "shutdown":
send({"jsonrpc":"2.0","id":message["id"],"result":None})
elif method == "exit":
break
"#
}
fn code_action_mock_server_script(resolve_required: bool) -> String {
format!(
r#"#!/usr/bin/env python3
import json
import sys
RESOLVE_REQUIRED = {resolve_required}
def read_message():
headers = {{}}
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
if line in (b"\r\n", b"\n"):
break
name, value = line.decode("utf-8").split(":", 1)
headers[name.strip().lower()] = value.strip()
body = sys.stdin.buffer.read(int(headers["content-length"]))
return json.loads(body.decode("utf-8"))
def send(payload):
body = json.dumps(payload).encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {{len(body)}}\r\n\r\n".encode("utf-8"))
sys.stdout.buffer.write(body)
sys.stdout.buffer.flush()
def edit(uri):
return {{
"changes": {{
uri: [
{{
"range": {{
"start": {{"line": 1, "character": 2}},
"end": {{"line": 1, "character": 18}}
}},
"newText": "extracted();"
}},
{{
"range": {{
"start": {{"line": 4, "character": 0}},
"end": {{"line": 4, "character": 0}}
}},
"newText": "\nfunction extracted() {{\n const value = 1;\n}}\n"
}}
]
}}
}}
while True:
message = read_message()
if message is None:
break
method = message.get("method")
if method == "initialize":
send({{"jsonrpc":"2.0","id":message["id"],"result":{{"capabilities":{{"codeActionProvider":{{"resolveProvider": True}}}}}}}})
elif method == "textDocument/codeAction":
uri = message["params"]["textDocument"]["uri"]
action = {{
"title": "Extract function",
"kind": "refactor.extract",
"data": {{"uri": uri}}
}}
if not RESOLVE_REQUIRED:
action["edit"] = edit(uri)
send({{"jsonrpc":"2.0","id":message["id"],"result":[action]}})
elif method == "codeAction/resolve":
uri = message["params"]["data"]["uri"]
action = dict(message["params"])
action["edit"] = edit(uri)
send({{"jsonrpc":"2.0","id":message["id"],"result":action}})
elif method == "shutdown":
send({{"jsonrpc":"2.0","id":message["id"],"result":None}})
elif method == "exit":
break
"#,
resolve_required = if resolve_required { "True" } else { "False" }
)
}
fn mock_server_script() -> &'static str {
r#"#!/usr/bin/env python3
import json
import sys
from pathlib import Path
count_file = Path(sys.argv[1]) if len(sys.argv) > 1 and sys.argv[1].endswith(".txt") else None
symbol_path = Path(sys.argv[1]) if len(sys.argv) > 1 and not sys.argv[1].endswith(".txt") else None
if len(sys.argv) > 2:
symbol_path = Path(sys.argv[2])
initialize_count = 0
def read_message():
headers = {}
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
if line in (b"\r\n", b"\n"):
break
name, value = line.decode("utf-8").split(":", 1)
headers[name.strip().lower()] = value.strip()
body = sys.stdin.buffer.read(int(headers["content-length"]))
return json.loads(body.decode("utf-8"))
def send(payload):
body = json.dumps(payload).encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8"))
sys.stdout.buffer.write(body)
sys.stdout.buffer.flush()
while True:
message = read_message()
if message is None:
break
method = message.get("method")
if method == "initialize":
initialize_count += 1
if count_file:
count_file.write_text(str(initialize_count))
send({"jsonrpc":"2.0","id":message["id"],"result":{"capabilities":{"referencesProvider": True}}})
elif method == "textDocument/references":
uri = message["params"]["textDocument"]["uri"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":[
{
"uri": uri,
"range": {
"start": {"line": 0, "character": 4},
"end": {"line": 0, "character": 9}
}
}
]
})
elif method == "textDocument/diagnostic":
uri = message["params"]["textDocument"]["uri"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":{
"kind":"full",
"uri": uri,
"items":[
{
"range":{
"start":{"line":0,"character":10},
"end":{"line":0,"character":11}
},
"severity":1,
"code":"E999",
"source":"mock-lsp",
"message":"syntax error"
}
]
}
})
elif method == "workspace/symbol":
query = message["params"]["query"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":[
{
"name": query,
"kind": 5,
"containerName": "sample",
"location": {
"uri": "file://" + str(symbol_path.resolve() if symbol_path else (Path.cwd() / "sample.py").resolve()),
"range": {
"start": {"line": 0, "character": 6},
"end": {"line": 0, "character": 13}
}
}
}
]
})
elif method == "textDocument/prepareTypeHierarchy":
uri = message["params"]["textDocument"]["uri"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":[
{
"name":"Service",
"kind":5,
"detail":"sample.Service",
"uri": uri,
"range":{
"start":{"line":0,"character":6},
"end":{"line":0,"character":13}
},
"selectionRange":{
"start":{"line":0,"character":6},
"end":{"line":0,"character":13}
},
"data":{"name":"Service"}
}
]
})
elif method == "typeHierarchy/supertypes":
item = message["params"]["item"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":[
{
"name":"BaseService",
"kind":5,
"detail":"sample.BaseService",
"uri": item["uri"],
"range": item["range"],
"selectionRange": item["selectionRange"],
"data":{"name":"BaseService"}
}
]
})
elif method == "typeHierarchy/subtypes":
item = message["params"]["item"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":[
{
"name":"ServiceImpl",
"kind":5,
"detail":"sample.ServiceImpl",
"uri": item["uri"],
"range": item["range"],
"selectionRange": item["selectionRange"],
"data":{"name":"ServiceImpl"}
}
]
})
elif method == "textDocument/prepareRename":
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":{
"range":{
"start":{"line":0,"character":6},
"end":{"line":0,"character":13}
},
"placeholder":"Service"
}
})
elif method == "textDocument/rename":
uri = message["params"]["textDocument"]["uri"]
new_name = message["params"]["newName"]
send({
"jsonrpc":"2.0",
"id":message["id"],
"result":{
"changes": {
uri: [
{
"range":{
"start":{"line":0,"character":6},
"end":{"line":0,"character":13}
},
"newText": new_name
},
{
"range":{
"start":{"line":3,"character":0},
"end":{"line":3,"character":7}
},
"newText": new_name
}
]
}
}
})
elif method == "shutdown":
send({"jsonrpc":"2.0","id":message["id"],"result":None})
elif method == "exit":
break
"#
}