#![cfg(all(unix, feature = "plugins"))]
use crate::common::harness::{copy_plugin, copy_plugin_lib, EditorTestHarness, HarnessOptions};
use crate::common::tracing::init_tracing_from_env;
use crossterm::event::{KeyCode, KeyModifiers};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
fn assert_test_infra() {
match std::process::Command::new("python3")
.arg("--version")
.output()
{
Ok(out) if out.status.success() => {
eprintln!(
"[infra] python3: {}",
String::from_utf8_lossy(&out.stdout).trim()
);
}
Ok(out) => panic!(
"infra: `python3 --version` exited {}; stderr={:?}. \
The fake-pylsp shebang requires python3 on PATH.",
out.status,
String::from_utf8_lossy(&out.stderr).trim()
),
Err(e) => panic!(
"infra: `python3 --version` failed to spawn: {e}. \
The fake-pylsp shebang requires python3 on PATH."
),
}
let pylsp = fake_lsp_bin_dir().join("fake-pylsp");
let meta = match std::fs::metadata(&pylsp) {
Ok(m) => m,
Err(e) => panic!("infra: fake-pylsp missing at {pylsp:?}: {e}"),
};
assert!(
meta.is_file(),
"infra: fake-pylsp at {pylsp:?} is not a regular file"
);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mode = meta.permissions().mode();
assert!(
mode & 0o111 != 0,
"infra: fake-pylsp at {pylsp:?} is not executable (mode={:o}). \
`chmod +x` it or check that git preserved the executable bit.",
mode
);
}
let first_line = std::fs::read_to_string(&pylsp)
.ok()
.and_then(|s| s.lines().next().map(str::to_string))
.unwrap_or_default();
assert!(
first_line.starts_with("#!") && first_line.contains("python"),
"infra: fake-pylsp shebang should reference python; got {first_line:?}"
);
eprintln!("[infra] fake-pylsp ready: {pylsp:?} ({first_line})");
}
fn dump_external_state(state: &Path, label: &str) {
eprintln!("=== fake state dump [{label}] ===");
eprintln!("--- fake_lsp_uris ---");
match fs::read_to_string(state.join("fake_lsp_uris")) {
Ok(s) if s.is_empty() => eprintln!("(empty)"),
Ok(s) => eprint!("{s}"),
Err(e) => eprintln!("(missing: {e})"),
}
eprintln!("--- exec_history ---");
match fs::read_to_string(state.join("exec_history")) {
Ok(s) if s.is_empty() => eprintln!("(empty)"),
Ok(s) => eprint!("{s}"),
Err(e) => eprintln!("(missing: {e})"),
}
let lsp_log_path = fresh::services::log_dirs::lsp_log_path("python");
eprintln!("--- lsp stderr ({}) ---", lsp_log_path.display());
match fs::read_to_string(&lsp_log_path) {
Ok(s) if s.is_empty() => eprintln!("(empty)"),
Ok(s) => {
let tail = if s.len() > 4096 {
&s[s.len() - 4096..]
} else {
&s[..]
};
eprint!("{tail}");
}
Err(e) => eprintln!("(missing: {e})"),
}
eprintln!("=== end dump ===");
}
fn wait_until_with_dumps<F>(harness: &mut EditorTestHarness, label: &str, state: &Path, mut cond: F)
where
F: FnMut(&EditorTestHarness) -> bool,
{
let start = Instant::now();
let mut last_dump = start;
eprintln!("[wait] start: {label}");
loop {
harness.tick_and_render().unwrap();
if cond(harness) {
eprintln!(
"[wait] satisfied: {label} (after {:.1}s)",
start.elapsed().as_secs_f64()
);
return;
}
if last_dump.elapsed() >= Duration::from_secs(10) {
eprintln!(
"[wait] still waiting on {label} after {:.1}s",
start.elapsed().as_secs_f64()
);
dump_external_state(state, label);
last_dump = Instant::now();
}
std::thread::sleep(Duration::from_millis(50));
harness.advance_time(Duration::from_millis(50));
}
}
fn fake_lsp_bin_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../../scripts/fake-lsp/bin")
.canonicalize()
.expect("scripts/fake-lsp/bin must exist relative to CARGO_MANIFEST_DIR")
}
fn set_up_workspace() -> (tempfile::TempDir, PathBuf) {
fresh::i18n::set_locale("en");
let temp = tempfile::tempdir().unwrap();
let workspace = temp.path().canonicalize().unwrap();
let dc = workspace.join(".devcontainer");
fs::create_dir_all(&dc).unwrap();
fs::write(
dc.join("devcontainer.json"),
r#"{
"name": "fake-lsp-go-to-def",
"image": "mcr.microsoft.com/devcontainers/base:ubuntu",
"remoteUser": "vscode",
"userEnvProbe": "loginInteractiveShell"
}"#,
)
.unwrap();
fs::write(
workspace.join("main.py"),
"from util import helper\n\n\ndef main():\n helper()\n",
)
.unwrap();
fs::write(
workspace.join("util.py"),
"# util.py — host copy\n\
# line 1\n\
# line 2\n\
# line 3\n\
# line 4\n\
def helper():\n\
\treturn 'host-side definition target'\n",
)
.unwrap();
let plugins_dir = workspace.join("plugins");
fs::create_dir_all(&plugins_dir).unwrap();
copy_plugin_lib(&plugins_dir);
copy_plugin(&plugins_dir, "devcontainer");
(temp, workspace)
}
fn wait_for_attach_popup(harness: &mut EditorTestHarness) {
harness
.wait_until(|h| {
let reg = h.editor().command_registry().read().unwrap();
reg.get_all().iter().any(|c| c.name == "%cmd.run_lifecycle")
})
.unwrap();
harness.editor().fire_plugins_loaded_hook();
harness
.wait_until(|h| {
let screen = h.screen_to_string();
screen.contains("Dev Container Detected") && screen.contains("Reopen in Container")
})
.unwrap();
}
fn wait_for_container_authority(harness: &mut EditorTestHarness) -> String {
loop {
harness.tick_and_render().unwrap();
if let Some(auth) = harness.editor_mut().take_pending_authority() {
harness.editor_mut().set_boot_authority(auth);
return harness.editor().authority().display_label.clone();
}
if harness
.editor()
.authority()
.display_label
.starts_with("Container:")
{
return harness.editor().authority().display_label.clone();
}
std::thread::sleep(std::time::Duration::from_millis(50));
harness.advance_time(std::time::Duration::from_millis(50));
}
}
fn read_uri_log(state: &Path) -> String {
fs::read_to_string(state.join("fake_lsp_uris")).unwrap_or_default()
}
#[test]
fn goto_definition_translates_uris_between_host_and_container() {
let (_workspace_temp, workspace) = set_up_workspace();
let main_py = workspace.join("main.py");
let host_util_py = workspace.join("util.py");
let mut config = fresh::config::Config::default();
config.lsp.insert(
"python".to_string(),
fresh::types::LspLanguageConfig::Multi(vec![fresh::services::lsp::LspServerConfig {
command: "fake-pylsp".to_string(),
args: vec![],
enabled: true,
auto_start: true,
process_limits: fresh::services::process_limits::ProcessLimits::default(),
initialization_options: None,
env: Default::default(),
language_id_overrides: Default::default(),
root_markers: vec![".devcontainer".to_string(), ".git".to_string()],
name: None,
only_features: None,
except_features: None,
}]),
);
let mut harness = EditorTestHarness::create(
160,
40,
HarnessOptions::new()
.with_working_dir(workspace.clone())
.with_config(config)
.with_fake_devcontainer()
.without_empty_plugins_dir(),
)
.unwrap();
std::env::set_var("FAKE_DC_REMOTE_WORKSPACE", "/workspaces/proj");
let state = harness
.fake_devcontainer_state()
.expect("with_fake_devcontainer was set")
.to_path_buf();
let fake_lsp_bin = fake_lsp_bin_dir();
let host_path = std::env::var("PATH").unwrap_or_default();
let already_on_path = host_path
.split(':')
.any(|p| Path::new(p) == fake_lsp_bin.as_path());
if !already_on_path {
std::env::set_var("PATH", format!("{}:{}", fake_lsp_bin.display(), host_path));
}
fs::write(
state.join("probe_response"),
format!(
"PATH=/home/vscode/.local/bin:/usr/local/bin:/usr/bin:/usr/sbin:/bin:/sbin:{}\n\
HOME=/home/vscode\n\
LANG=C.UTF-8\n",
fake_lsp_bin.display()
),
)
.expect("write probe_response");
fs::write(
state.join("fake_lsp_definition_uri"),
"file:///workspaces/proj/util.py\n",
)
.expect("write fake_lsp_definition_uri");
fs::write(state.join("fake_lsp_definition_line"), "5\n")
.expect("write fake_lsp_definition_line");
fs::write(state.join("fake_lsp_definition_character"), "0\n")
.expect("write fake_lsp_definition_character");
harness.tick_and_render().unwrap();
wait_for_attach_popup(&mut harness);
harness
.send_key(KeyCode::Enter, KeyModifiers::NONE)
.unwrap();
let label = wait_for_container_authority(&mut harness);
assert!(
label.starts_with("Container:"),
"expected container authority, got {label:?}"
);
harness.open_file(&main_py).unwrap();
wait_until_with_dumps(&mut harness, "fake-pylsp initialize", &state, |_| {
read_uri_log(&state)
.lines()
.any(|l| l.starts_with("initialize "))
});
wait_until_with_dumps(&mut harness, "fake-pylsp didOpen main.py", &state, |_| {
read_uri_log(&state)
.lines()
.any(|l| l.starts_with("didOpen ") && l.contains("main.py"))
});
for _ in 0..4 {
harness.send_key(KeyCode::Down, KeyModifiers::NONE).unwrap();
}
for _ in 0..6 {
harness
.send_key(KeyCode::Right, KeyModifiers::NONE)
.unwrap();
}
harness.process_async_and_render().unwrap();
harness
.send_key(KeyCode::F(12), KeyModifiers::NONE)
.unwrap();
wait_until_with_dumps(&mut harness, "active buffer == host util.py", &state, |h| {
h.editor()
.active_state()
.buffer
.file_path()
.is_some_and(|p| p == host_util_py.as_path())
});
let log = read_uri_log(&state);
let did_open_lines: Vec<&str> = log.lines().filter(|l| l.starts_with("didOpen ")).collect();
assert!(
!did_open_lines.is_empty(),
"expected at least one didOpen URI in the fake-lsp log; full log:\n{log}"
);
let first_did_open = did_open_lines[0];
assert!(
first_did_open.contains("file:///workspaces/proj/main.py"),
"didOpen URI must use the in-container workspace path \
`/workspaces/proj/main.py`, but got: {first_did_open:?}.\n\
Full uri log:\n{log}\n\
(This is the host→container URI translation gap: the editor \
tells the LSP about a file at the host path, which the \
in-container server can't see.)"
);
assert!(
!first_did_open.contains(workspace.to_str().unwrap()),
"didOpen URI must NOT carry the host workspace path; got: \
{first_did_open:?}. Full uri log:\n{log}"
);
let active_path: Option<PathBuf> = harness
.editor()
.active_state()
.buffer
.file_path()
.map(|p| p.to_path_buf());
assert_eq!(
active_path.as_deref(),
Some(host_util_py.as_path()),
"after Go-to-Definition the active buffer's host path must be \
the host's util.py, not the literal container path returned \
by the LSP. Got: {active_path:?}. \
(Container→host URI translation gap.)"
);
let cursor_pos = harness.cursor_position();
let (line, character) = harness
.editor()
.active_state()
.buffer
.position_to_lsp_position(cursor_pos);
assert_eq!(
(line, character),
(5, 0),
"after Go-to-Definition the cursor must be at line 5, col 0 \
(matching the fake LSP's response). A line:0 col:0 result \
usually means the editor opened a fresh empty buffer for \
the container URI instead of resolving it to the host file."
);
std::env::remove_var("FAKE_DC_REMOTE_WORKSPACE");
}
fn arrange_attached_session_with_open_main_py() -> (
tempfile::TempDir,
std::path::PathBuf,
EditorTestHarness,
std::path::PathBuf,
) {
init_tracing_from_env();
assert_test_infra();
let (workspace_temp, workspace) = set_up_workspace();
let main_py = workspace.join("main.py");
let mut config = fresh::config::Config::default();
config.lsp.insert(
"python".to_string(),
fresh::types::LspLanguageConfig::Multi(vec![fresh::services::lsp::LspServerConfig {
command: "fake-pylsp".to_string(),
args: vec![],
enabled: true,
auto_start: true,
process_limits: fresh::services::process_limits::ProcessLimits::default(),
initialization_options: None,
env: Default::default(),
language_id_overrides: Default::default(),
root_markers: vec![".devcontainer".to_string(), ".git".to_string()],
name: None,
only_features: None,
except_features: None,
}]),
);
let mut harness = EditorTestHarness::create(
160,
40,
HarnessOptions::new()
.with_working_dir(workspace.clone())
.with_config(config)
.with_fake_devcontainer()
.without_empty_plugins_dir(),
)
.unwrap();
std::env::set_var("FAKE_DC_REMOTE_WORKSPACE", "/workspaces/proj");
let state = harness
.fake_devcontainer_state()
.expect("with_fake_devcontainer was set")
.to_path_buf();
let fake_lsp_bin = fake_lsp_bin_dir();
let host_path = std::env::var("PATH").unwrap_or_default();
if !host_path
.split(':')
.any(|p| Path::new(p) == fake_lsp_bin.as_path())
{
std::env::set_var("PATH", format!("{}:{}", fake_lsp_bin.display(), host_path));
}
fs::write(
state.join("probe_response"),
format!(
"PATH=/home/vscode/.local/bin:/usr/local/bin:/usr/bin:/usr/sbin:/bin:/sbin:{}\n\
HOME=/home/vscode\n\
LANG=C.UTF-8\n",
fake_lsp_bin.display()
),
)
.expect("write probe_response");
harness.tick_and_render().unwrap();
wait_for_attach_popup(&mut harness);
harness
.send_key(KeyCode::Enter, KeyModifiers::NONE)
.unwrap();
let label = wait_for_container_authority(&mut harness);
assert!(
label.starts_with("Container:"),
"expected container authority, got {label:?}"
);
{
let spawner = harness.editor().authority().long_running_spawner.clone();
let rt = tokio::runtime::Runtime::new().expect("tokio runtime for command_exists probe");
let exists = rt.block_on(async move { spawner.command_exists("fake-pylsp").await });
drop(rt);
if !exists {
eprintln!("[infra] command_exists('fake-pylsp') returned false");
eprintln!("[infra] host fake_lsp_bin: {}", fake_lsp_bin.display());
eprintln!(
"[infra] host PATH: {}",
std::env::var("PATH").unwrap_or_default()
);
let host_probe = std::process::Command::new("sh")
.args(["-c", "command -v fake-pylsp"])
.output();
match host_probe {
Ok(out) => eprintln!(
"[infra] host `sh -c 'command -v fake-pylsp'`: status={}, stdout={:?}, stderr={:?}",
out.status,
String::from_utf8_lossy(&out.stdout).trim(),
String::from_utf8_lossy(&out.stderr).trim(),
),
Err(e) => eprintln!("[infra] host `sh -c 'command -v fake-pylsp'`: spawn failed: {e}"),
}
let shim_probe = std::process::Command::new("docker")
.args([
"exec",
"-e",
&format!(
"PATH=/home/vscode/.local/bin:/usr/local/bin:/usr/bin:/usr/sbin:/bin:/sbin:{}",
fake_lsp_bin.display()
),
"fake-id",
"sh",
"-c",
"echo PATH=$PATH; command -v fake-pylsp; echo exit=$?",
])
.output();
match shim_probe {
Ok(out) => eprintln!(
"[infra] direct fake-docker probe: status={}, stdout={:?}, stderr={:?}",
out.status,
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
),
Err(e) => eprintln!("[infra] direct fake-docker probe: spawn failed: {e}"),
}
dump_external_state(&state, "command_exists pre-flight");
panic!(
"infra: command_exists('fake-pylsp') returned false through the \
active authority's spawner. The editor's LSP spawn will fail \
the same way and the test would otherwise hang. See diagnostics above."
);
}
eprintln!("[infra] command_exists('fake-pylsp') OK through authority spawner");
}
harness.open_file(&main_py).unwrap();
wait_until_with_dumps(&mut harness, "fake-pylsp initialize", &state, |_| {
read_uri_log(&state)
.lines()
.any(|l| l.starts_with("initialize "))
});
wait_until_with_dumps(&mut harness, "fake-pylsp didOpen main.py", &state, |_| {
read_uri_log(&state)
.lines()
.any(|l| l.starts_with("didOpen ") && l.contains("main.py"))
});
(workspace_temp, workspace, harness, state)
}
fn pin_fake_lsp_definition(state: &Path, uri: &str, line: u32, character: u32) {
fs::write(state.join("fake_lsp_definition_uri"), format!("{uri}\n"))
.expect("write fake_lsp_definition_uri");
fs::write(state.join("fake_lsp_definition_line"), format!("{line}\n"))
.expect("write fake_lsp_definition_line");
fs::write(
state.join("fake_lsp_definition_character"),
format!("{character}\n"),
)
.expect("write fake_lsp_definition_character");
}
fn stash_container_file(state: &Path, container_path: &str, content: &str) {
let stash = state.join("container_fs").join(
container_path
.strip_prefix('/')
.expect("container_path must be absolute"),
);
fs::create_dir_all(stash.parent().expect("non-root container path")).unwrap();
fs::write(&stash, content).unwrap_or_else(|e| panic!("stash {stash:?}: {e}"));
}
fn trigger_goto_definition(harness: &mut EditorTestHarness, down: usize, right: usize) {
for _ in 0..down {
harness.send_key(KeyCode::Down, KeyModifiers::NONE).unwrap();
}
for _ in 0..right {
harness
.send_key(KeyCode::Right, KeyModifiers::NONE)
.unwrap();
}
harness.process_async_and_render().unwrap();
harness
.send_key(KeyCode::F(12), KeyModifiers::NONE)
.unwrap();
}
#[test]
fn goto_definition_into_container_only_file_opens_read_only_buffer() {
let (_workspace_temp, _workspace, mut harness, state) =
arrange_attached_session_with_open_main_py();
let container_path = "/home/vscode/.local/lib/python3.12/site-packages/flask/app.py";
let container_content = "# flask/app.py — fetched from container\n\
# line 1\n\
# line 2\n\
def some_app_helper():\n\
\treturn 'this content lives only in the container'\n";
stash_container_file(&state, container_path, container_content);
pin_fake_lsp_definition(&state, &format!("file://{container_path}"), 3, 0);
trigger_goto_definition(&mut harness, 4, 6);
wait_until_with_dumps(
&mut harness,
"active buffer == fetched container path",
&state,
|h| {
h.editor()
.active_state()
.buffer
.file_path()
.is_some_and(|p| p == Path::new(container_path))
},
);
let active_path: Option<PathBuf> = harness
.editor()
.active_state()
.buffer
.file_path()
.map(|p| p.to_path_buf());
assert_eq!(
active_path.as_deref(),
Some(Path::new(container_path)),
"active buffer path must be the container path; got {active_path:?}"
);
let content = harness
.editor()
.active_state()
.buffer
.to_string()
.expect("buffer content readable");
assert_eq!(
content, container_content,
"active buffer content must match what `docker exec cat` returned"
);
assert!(
harness.editor().is_active_buffer_read_only(),
"container-fetched buffers must be read-only (no host-side writeback)"
);
let cursor_pos = harness.cursor_position();
let (line, character) = harness
.editor()
.active_state()
.buffer
.position_to_lsp_position(cursor_pos);
assert_eq!(
(line, character),
(3, 0),
"cursor must land at the LSP-reported line/character. (0,0) \
usually means the editor created an empty buffer instead of \
fetching the container file."
);
std::env::remove_var("FAKE_DC_REMOTE_WORKSPACE");
}
#[test]
fn goto_definition_to_unreachable_uri_surfaces_error_message() {
let (_workspace_temp, _workspace, mut harness, state) =
arrange_attached_session_with_open_main_py();
let unreachable = "/this/path/exists/nowhere/ghost.py";
pin_fake_lsp_definition(&state, &format!("file://{unreachable}"), 7, 0);
trigger_goto_definition(&mut harness, 4, 6);
wait_until_with_dumps(&mut harness, "status line: 'could not open'", &state, |h| {
h.screen_to_string().contains("could not open")
});
let active_path: Option<PathBuf> = harness
.editor()
.active_state()
.buffer
.file_path()
.map(|p| p.to_path_buf());
assert_ne!(
active_path.as_deref(),
Some(Path::new(unreachable)),
"Goto-Def into an unreachable URI must NOT open a phantom \
buffer at that path. Got: {active_path:?}"
);
std::env::remove_var("FAKE_DC_REMOTE_WORKSPACE");
}