mod cli;
use aft::bash_background::BgTaskRegistry;
use aft::config::Config;
use aft::context::{App, AppContext, SemanticIndexStatus};
use aft::log_ctx;
use aft::lsp::child_registry::LspChildRegistry;
use aft::protocol::{EchoParams, PushFrame, RawRequest, Response};
use aft::runtime_registry::RuntimeRegistry;
use std::io::{self, BufRead, Write};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
fn parse_subc_arg(
args: impl IntoIterator<Item = std::ffi::OsString>,
) -> Option<std::path::PathBuf> {
let mut args = args.into_iter();
while let Some(arg) = args.next() {
if arg == "--subc" {
return args.next().map(std::path::PathBuf::from);
}
if let Some(raw) = arg.to_str().and_then(|a| a.strip_prefix("--subc=")) {
if !raw.is_empty() {
return Some(std::path::PathBuf::from(raw));
}
}
}
None
}
fn main() {
if std::env::args().any(|a| a == "--version" || a == "-V") {
println!("aft {}", env!("CARGO_PKG_VERSION"));
return;
}
if std::env::args().nth(1).as_deref() == Some("migrate-storage") {
let args = std::env::args_os().skip(2).collect::<Vec<_>>();
match aft::migrate_storage::parse_cli_args(args) {
Ok(args) => {
let status = aft::migrate_storage::run_with_options(
args,
aft::migrate_storage::Options::default(),
);
std::process::exit(i32::from(status.code()));
}
Err(message) => {
eprintln!("{message}");
std::process::exit(2);
}
}
}
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format(|buf, record| {
use std::io::Write;
let prefix = if record.target().starts_with("aft::lsp")
|| record.target().starts_with("aft_lsp")
{
"[aft-lsp]"
} else {
"[aft]"
};
writeln!(buf, "{} {}", prefix, record.args())
})
.init();
if std::env::args().nth(1).as_deref() == Some("warmup") {
let args = std::env::args_os().skip(2).collect::<Vec<_>>();
match cli::warmup::run(args) {
Ok(()) => return,
Err(error) => {
eprintln!("{error}");
std::process::exit(error.exit_code());
}
}
}
if let Some(connection_file) = parse_subc_arg(std::env::args_os().skip(1)) {
aft::slog_info!("subc mode, pid {}", std::process::id());
let app = App::default_shared();
let ctx = Arc::new(AppContext::from_app(Arc::clone(&app), Config::default()));
let executor = Arc::new(aft::executor::Executor::new());
let user_config_path = aft::subc_config::cortexkit_user_config_path();
match aft::subc::run_subc_mode(&connection_file, ctx, executor, dispatch, user_config_path)
{
Ok(()) => return,
Err(error) => {
aft::slog_error!("subc attach failed: {error}");
std::process::exit(1);
}
}
}
aft::slog_info!("started, pid {}", std::process::id());
let app = App::default_shared();
let ctx = AppContext::from_app(Arc::clone(&app), Config::default());
let registry = RuntimeRegistry::standalone(app, ctx);
{
let bg_registries = signal_bg_registries(®istry);
let lsp_children = registry.app().lsp_child_registry();
install_signal_handler(bg_registries, lsp_children);
}
for runtime in registry.iter() {
install_bash_compressor(runtime);
}
let stdout_writer = registry.current().stdout_writer();
let shutdown_requested = Arc::new(AtomicBool::new(false));
let shutdown_from_push = Arc::clone(&shutdown_requested);
registry
.current()
.set_progress_sender(Some(Arc::new(Box::new(move |frame: PushFrame| {
let Ok(mut writer) = stdout_writer.lock() else {
aft::slog_error!("stdout push frame lock poisoned; shutting down bridge");
shutdown_from_push.store(true, Ordering::SeqCst);
return;
};
write_push_frame_or_request_shutdown(&mut *writer, &frame, &shutdown_from_push);
}))));
const DRAIN_INTERVAL: Duration = Duration::from_millis(250);
let (line_tx, line_rx) = mpsc::channel::<io::Result<String>>();
thread::spawn(move || {
let stdin = io::stdin();
let reader = stdin.lock();
for line_result in reader.lines() {
if line_tx.send(line_result).is_err() {
break;
}
}
});
loop {
if shutdown_requested.load(Ordering::SeqCst) {
break;
}
let line_result = match line_rx.recv_timeout(DRAIN_INTERVAL) {
Ok(result) => result,
Err(mpsc::RecvTimeoutError::Timeout) => {
drain_runtime_events(®istry);
if shutdown_requested.load(Ordering::SeqCst) {
break;
}
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
let line = match line_result {
Ok(l) => l,
Err(e) => {
aft::slog_error!("stdin read error: {}", e);
break;
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let mut shutdown_after_response = false;
let response = match serde_json::from_str::<RawRequest>(trimmed) {
Ok(req) => {
drain_runtime_events(®istry);
let request_id = req.id.clone();
let session_id = req.session().to_string();
let command = req.command.clone();
let session_id_for_log = req.session_id.clone();
let runtime = registry.current();
let dispatch_result = catch_unwind(AssertUnwindSafe(|| {
log_ctx::with_session(session_id_for_log, || dispatch(req, runtime))
}));
match dispatch_result {
Ok(mut response) => {
aft::response_finalize::attach_bg_completions(
&mut response,
runtime,
&session_id,
&command,
);
aft::response_finalize::attach_status_bar(&mut response, runtime, &command);
response
}
Err(payload) => {
shutdown_after_response = true;
dispatch_panic_response(request_id, &command, payload.as_ref())
}
}
}
Err(e) => {
aft::slog_error!("parse error: {} — input: {}", e, trimmed);
Response::error(
"_parse_error",
"parse_error",
format!("failed to parse request: {}", e),
)
}
};
if let Err(e) = write_response(registry.current(), &response) {
aft::slog_error!("stdout write error: {}", e);
break;
}
drain_configure_warning_events_for_registry(®istry);
if shutdown_after_response || shutdown_requested.load(Ordering::SeqCst) {
break;
}
}
for runtime in registry.iter() {
runtime.lsp().shutdown_all();
runtime.bash_background().detach();
}
aft::slog_info!("stdin closed, shutting down");
}
fn drain_runtime_events(registry: &RuntimeRegistry) {
for runtime in registry.iter() {
aft::runtime_drain::drain_configure_warning_events(runtime);
aft::runtime_drain::drain_search_index_events(runtime);
aft::runtime_drain::drain_callgraph_store_events(runtime);
aft::runtime_drain::drain_semantic_index_events(runtime);
aft::runtime_drain::drain_semantic_refresh_events(runtime);
aft::runtime_drain::drain_inspect_events(runtime);
aft::runtime_drain::drain_watcher_events(runtime);
aft::runtime_drain::drain_lsp_events(runtime);
}
}
fn drain_configure_warning_events_for_registry(registry: &RuntimeRegistry) {
for runtime in registry.iter() {
aft::runtime_drain::drain_configure_warning_events(runtime);
}
}
fn signal_bg_registries(registry: &RuntimeRegistry) -> Vec<BgTaskRegistry> {
registry
.iter()
.map(|runtime| runtime.bash_background().clone())
.collect()
}
fn install_bash_compressor(runtime: &AppContext) {
let filter_registry_handle = runtime.shared_filter_registry();
let compress_flag = runtime.bash_compress_flag();
runtime.bash_background().set_compressor_with_exit_code(
move |command: &str, output: String, exit_code: Option<i32>| {
if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
return aft::compress::CompressionResult::new(output);
}
let registry_guard = match filter_registry_handle.read() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
aft::compress::compress_with_registry_exit_code(
command,
&output,
exit_code,
®istry_guard,
)
},
);
}
#[cfg(unix)]
fn install_signal_handler(bg_registries: Vec<BgTaskRegistry>, lsp_children: LspChildRegistry) {
let signals = signal_hook::iterator::Signals::new([
signal_hook::consts::SIGINT,
signal_hook::consts::SIGTERM,
]);
let Ok(mut signals) = signals else {
if let Err(error) = signals {
aft::slog_error!("failed to install signal handlers: {error}");
}
return;
};
std::thread::spawn(move || {
if let Some(signal) = signals.forever().next() {
for registry in &bg_registries {
registry.detach();
}
let killed = lsp_children.kill_all();
if killed > 0 {
aft::slog_info!("signal {}: killed {} LSP child process(es)", signal, killed);
}
std::process::exit(128 + signal);
}
});
}
#[cfg(not(unix))]
static WINDOWS_SIGNAL_REGISTRIES: std::sync::OnceLock<(Vec<BgTaskRegistry>, LspChildRegistry)> =
std::sync::OnceLock::new();
#[cfg(windows)]
unsafe extern "system" fn windows_console_handler(ctrl_type: u32) -> i32 {
const CTRL_C_EVENT: u32 = 0;
const CTRL_BREAK_EVENT: u32 = 1;
const CTRL_CLOSE_EVENT: u32 = 2;
const CTRL_LOGOFF_EVENT: u32 = 5;
const CTRL_SHUTDOWN_EVENT: u32 = 6;
if matches!(
ctrl_type,
CTRL_C_EVENT
| CTRL_BREAK_EVENT
| CTRL_CLOSE_EVENT
| CTRL_LOGOFF_EVENT
| CTRL_SHUTDOWN_EVENT
) {
if let Some((bg_registries, lsp_children)) = WINDOWS_SIGNAL_REGISTRIES.get() {
for registry in bg_registries {
registry.detach();
}
let killed = lsp_children.kill_all();
if killed > 0 {
aft::slog_info!(
"windows console event {ctrl_type}: killed {killed} LSP child process(es)"
);
}
}
1
} else {
0
}
}
#[cfg(windows)]
#[link(name = "Kernel32")]
unsafe extern "system" {
fn SetConsoleCtrlHandler(
handler: Option<unsafe extern "system" fn(u32) -> i32>,
add: i32,
) -> i32;
}
#[cfg(not(unix))]
fn install_signal_handler(bg_registries: Vec<BgTaskRegistry>, lsp_children: LspChildRegistry) {
#[cfg(windows)]
{
let _ = WINDOWS_SIGNAL_REGISTRIES.set((bg_registries, lsp_children));
let ok = unsafe { SetConsoleCtrlHandler(Some(windows_console_handler), 1) };
if ok == 0 {
aft::slog_error!("failed to install Windows console control handler");
}
}
#[cfg(not(windows))]
{
let _ = (bg_registries, lsp_children);
}
}
fn write_push_frame_or_request_shutdown(
writer: &mut impl Write,
frame: &PushFrame,
shutdown_requested: &AtomicBool,
) {
if let Err(error) = write_push_frame(writer, frame) {
aft::slog_error!(
"stdout push frame write error: {}; shutting down bridge",
error
);
shutdown_requested.store(true, Ordering::SeqCst);
}
}
fn dispatch_panic_response(
request_id: impl Into<String>,
command: &str,
payload: &(dyn std::any::Any + Send),
) -> Response {
let panic_message = panic_payload_message(payload);
aft::slog_error!(
"command '{}' panicked: {}; shutting down bridge",
command,
panic_message
);
Response::error(
request_id,
"internal_error",
format!("command '{command}' panicked: {panic_message}"),
)
}
fn panic_payload_message(payload: &(dyn std::any::Any + Send)) -> String {
if let Some(message) = payload.downcast_ref::<&'static str>() {
(*message).to_string()
} else if let Some(message) = payload.downcast_ref::<String>() {
message.clone()
} else {
"unknown panic payload".to_string()
}
}
fn dispatch(req: RawRequest, ctx: &AppContext) -> Response {
match req.command.as_str() {
"ping" => Response::success(&req.id, serde_json::json!({ "command": "pong" })),
"version" => Response::success(
&req.id,
serde_json::json!({ "version": env!("CARGO_PKG_VERSION") }),
),
"echo" => handle_echo(&req),
"bash" => aft::commands::bash::handle(&req, ctx),
"bash_drain_completions" => aft::commands::bash_drain_completions::handle(&req, ctx),
"bash_ack_completions" => aft::commands::bash_drain_completions::handle_ack(&req, ctx),
"bash_status" => aft::commands::bash_status::handle(&req, ctx),
"bash_notify" => aft::commands::bash_notify::handle(&req, ctx),
"bash_unnotify" => aft::commands::bash_notify::handle_unnotify(&req, ctx),
"bash_promote" => aft::commands::bash_promote::handle(&req, ctx),
"bash_regex_match" => aft::commands::bash_regex_match::handle(&req),
"bash_kill" => aft::commands::bash_kill::handle(&req, ctx),
"bash_write" => aft::commands::bash_write::handle(&req, ctx),
"db_get_state" => aft::commands::state::handle_db_get_state(&req, ctx),
"db_set_state" => aft::commands::state::handle_db_set_state(&req, ctx),
"db_get_host_state" => aft::commands::state::handle_db_get_host_state(&req, ctx),
"db_set_host_state" => aft::commands::state::handle_db_set_host_state(&req, ctx),
"outline" => aft::commands::outline::handle_outline(&req, ctx),
"zoom" => aft::commands::zoom::handle_zoom(&req, ctx),
"read" => aft::commands::read::handle_read(&req, ctx),
"undo" | "undo_preview" | "edit_history" | "checkpoint" | "checkpoint_paths"
| "restore_checkpoint" | "list_checkpoints"
if ctx.config().backup.enabled == Some(false) =>
{
Response::error(
&req.id,
"backups_disabled",
"backups disabled by user config",
)
}
"undo" => aft::commands::undo::handle_undo(&req, ctx),
"undo_preview" => aft::commands::undo::handle_undo_preview(&req, ctx),
"edit_history" => aft::commands::edit_history::handle_edit_history(&req, ctx),
"checkpoint" => aft::commands::checkpoint::handle_checkpoint(&req, ctx),
"checkpoint_paths" => aft::commands::checkpoint::handle_checkpoint_paths(&req, ctx),
"restore_checkpoint" => {
aft::commands::restore_checkpoint::handle_restore_checkpoint(&req, ctx)
}
"list_checkpoints" => aft::commands::list_checkpoints::handle_list_checkpoints(&req, ctx),
"write" => aft::commands::write::handle_write(&req, ctx),
"delete_file" => aft::commands::delete_file::handle_delete_file(&req, ctx),
"move_file" => aft::commands::move_file::handle_move_file(&req, ctx),
"edit_symbol" => aft::commands::edit_symbol::handle_edit_symbol(&req, ctx),
"edit_match" => aft::commands::edit_match::handle_edit_match(&req, ctx),
"batch" => aft::commands::batch::handle_batch(&req, ctx),
"add_import" => aft::commands::add_import::handle_add_import(&req, ctx),
"remove_import" => aft::commands::remove_import::handle_remove_import(&req, ctx),
"organize_imports" => aft::commands::organize_imports::handle_organize_imports(&req, ctx),
"configure" => aft::commands::configure::handle_configure(&req, ctx),
"glob" => aft::commands::glob::handle_glob(&req, ctx),
"grep" => aft::commands::grep::handle_grep(&req, ctx),
"semantic_search" => {
if let Some(response) = wait_for_semantic_index_before_search(&req, ctx) {
response
} else {
aft::commands::semantic_search::handle_semantic_search(&req, ctx)
}
}
"status" => aft::commands::status::handle_status(&req, ctx),
"list_filters" => aft::commands::list_filters::handle_list_filters(&req, ctx),
"trust_filter_project" => {
aft::commands::trust_filter_project::handle_trust_filter_project(&req, ctx)
}
"untrust_filter_project" => {
aft::commands::untrust_filter_project::handle_untrust_filter_project(&req, ctx)
}
"call_tree" => aft::commands::call_tree::handle_call_tree(&req, ctx),
"callers" => aft::commands::callers::handle_callers(&req, ctx),
"trace_to" => aft::commands::trace_to::handle_trace_to(&req, ctx),
"trace_to_symbol" => aft::commands::trace_to_symbol::handle_trace_to_symbol(&req, ctx),
"impact" => aft::commands::impact::handle_impact(&req, ctx),
"trace_data" => aft::commands::trace_data::handle_trace_data(&req, ctx),
"move_symbol" => aft::commands::move_symbol::handle_move_symbol(&req, ctx),
"extract_function" => aft::commands::extract_function::handle_extract_function(&req, ctx),
"inline_symbol" => aft::commands::inline_symbol::handle_inline_symbol(&req, ctx),
"inspect" => aft::commands::inspect::handle_inspect(&req, ctx),
"inspect_tier2_run" => aft::commands::inspect::handle_inspect_tier2_run(&req, ctx),
"git_conflicts" => aft::commands::conflicts::handle_git_conflicts(ctx, &req),
"ast_search" => aft::commands::ast_search::handle_ast_search(&req, ctx),
"ast_replace" => aft::commands::ast_replace::handle_ast_replace(&req, ctx),
"lsp_diagnostics" => aft::commands::lsp_diagnostics::handle_lsp_diagnostics(&req, ctx),
"lsp_inspect" => aft::commands::lsp_inspect::handle_lsp_inspect(&req, ctx),
"lsp_hover" => aft::commands::lsp_hover::handle_lsp_hover(&req, ctx),
"lsp_goto_definition" => {
aft::commands::lsp_goto_definition::handle_lsp_goto_definition(&req, ctx)
}
"lsp_find_references" => {
aft::commands::lsp_find_references::handle_lsp_find_references(&req, ctx)
}
"lsp_prepare_rename" => {
aft::commands::lsp_prepare_rename::handle_lsp_prepare_rename(&req, ctx)
}
"lsp_rename" => aft::commands::lsp_rename::handle_lsp_rename(&req, ctx),
"snapshot" => handle_snapshot(&req, ctx),
_ => {
aft::slog_warn!("unknown command: {}", req.command);
Response::error(
&req.id,
"unknown_command",
format!("unknown command: {}", req.command),
)
}
}
}
fn handle_echo(req: &RawRequest) -> Response {
match serde_json::from_value::<EchoParams>(req.params.clone()) {
Ok(params) => Response::success(&req.id, serde_json::json!({ "message": params.message })),
Err(e) => Response::error(
&req.id,
"invalid_request",
format!("echo: invalid params: {}", e),
),
}
}
fn wait_for_semantic_index_before_search(req: &RawRequest, ctx: &AppContext) -> Option<Response> {
if std::env::var_os("AFT_WAIT_FOR_SEMANTIC_READY").is_none() || !ctx.config().semantic_search {
return None;
}
let timeout_ms = std::env::var("AFT_WAIT_FOR_SEMANTIC_READY_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(600_000);
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
loop {
aft::runtime_drain::drain_search_index_events(ctx);
aft::runtime_drain::drain_semantic_index_events(ctx);
match ctx
.semantic_index_status()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
{
SemanticIndexStatus::Ready { .. }
| SemanticIndexStatus::Disabled
| SemanticIndexStatus::Failed(_) => return None,
SemanticIndexStatus::Building { stage, .. } => {
if Instant::now() >= deadline {
return Some(Response::error(
&req.id,
"semantic_index_timeout",
format!(
"semantic index did not become ready before semantic_search within {timeout_ms}ms (stage: {stage})"
),
));
}
}
}
thread::sleep(Duration::from_millis(250));
}
}
fn handle_snapshot(req: &RawRequest, ctx: &AppContext) -> Response {
let file = match req.params.get("file").and_then(|v| v.as_str()) {
Some(f) => f,
None => {
return Response::error(
&req.id,
"invalid_request",
"snapshot: missing required param 'file'",
);
}
};
let path = match ctx.validate_path(&req.id, std::path::Path::new(file)) {
Ok(p) => p,
Err(resp) => return resp,
};
let path = path.as_path();
let mut backup = ctx.backup().lock();
match backup.snapshot(req.session(), path, "manual snapshot") {
Ok(id) => Response::success(&req.id, serde_json::json!({ "backup_id": id })),
Err(e) => Response::error(&req.id, e.code(), e.to_string()),
}
}
fn write_response(ctx: &AppContext, response: &Response) -> io::Result<()> {
let stdout_writer = ctx.stdout_writer();
let mut writer = stdout_writer
.lock()
.map_err(|_| io::Error::other("stdout writer lock poisoned"))?;
serde_json::to_writer(&mut *writer, response)?;
writer.write_all(b"\n")?;
writer.flush()?;
Ok(())
}
fn write_push_frame(writer: &mut impl Write, frame: &PushFrame) -> io::Result<()> {
serde_json::to_writer(&mut *writer, frame)?;
writer.write_all(b"\n")?;
writer.flush()?;
Ok(())
}
#[cfg(test)]
mod signal_handler_tests {
use super::{signal_bg_registries, App, AppContext, Config, RuntimeRegistry};
use std::sync::Arc;
#[test]
fn signal_bg_registry_collection_includes_standalone_actor() {
let app = App::default_shared();
let ctx = AppContext::from_app(Arc::clone(&app), Config::default());
let registry = RuntimeRegistry::standalone(app, ctx);
assert_eq!(signal_bg_registries(®istry).len(), 1);
}
}
#[cfg(test)]
mod watcher_filter_tests {
use super::{dispatch, dispatch_panic_response, write_push_frame_or_request_shutdown};
use aft::config::{BackupConfig, Config};
use aft::context::{
AppContext, CallgraphStoreAccess, SemanticIndexEvent, SemanticIndexStatus,
SemanticRefreshEvent, SemanticRefreshRequest, SemanticRefreshWorkerSlot,
};
use aft::harness::Harness;
use aft::lsp::diagnostics::{DiagnosticSeverity, StoredDiagnostic};
use aft::lsp::registry::ServerKind;
use aft::lsp::roots::ServerKey;
use aft::parser::TreeSitterProvider;
use aft::protocol::{ConfigureWarningsFrame, PushFrame, RawRequest, Response};
use aft::response_finalize::attach_status_bar;
use aft::runtime_drain::{
drain_semantic_refresh_events, drain_watcher_events,
record_semantic_refresh_transient_failure, schedule_semantic_refresh_retry,
semantic_refresh_circuit_is_open, semantic_refresh_probe_is_scheduled_for_test,
semantic_refresh_transient_failure_count_for_test, watcher_path_is_callgraph_indexed,
BREAKER_TRIP_THRESHOLD, MAX_RETRY_ATTEMPTS, WATCHER_BATCH_INLINE_CAP,
};
use aft::semantic_index::SemanticIndex;
use aft::watcher_filter::{
watcher_event_invalidates, FilteredWatcherPaths, WatcherDispatchEvent, WatcherFilterConfig,
};
use notify::event::{
AccessKind, AccessMode, CreateKind, DataChange, MetadataKind, ModifyKind, RemoveKind,
RenameMode,
};
use notify::EventKind;
use tempfile::TempDir;
const RECV_DISPATCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
fn make_ctx_with_root(root: &std::path::Path) -> AppContext {
AppContext::new(
Box::new(TreeSitterProvider::new()),
Config {
project_root: Some(root.to_path_buf()),
..Config::default()
},
)
}
fn install_watcher_rx(ctx: &AppContext) -> crossbeam_channel::Sender<WatcherDispatchEvent> {
let (tx, rx) = crossbeam_channel::unbounded();
*ctx.watcher_rx().lock() = Some(rx);
tx
}
fn watcher_paths_event(path: std::path::PathBuf) -> WatcherDispatchEvent {
WatcherDispatchEvent::Paths(vec![path])
}
fn filter_watcher_raw_paths<I>(ctx: &AppContext, raw_paths: I) -> FilteredWatcherPaths
where
I: IntoIterator<Item = std::path::PathBuf>,
{
let root = ctx
.canonical_cache_root_opt()
.or_else(|| ctx.config().project_root.clone())
.expect("test context has project root");
let root = std::fs::canonicalize(&root).unwrap_or(root);
let config = WatcherFilterConfig::new(root, ctx.git_common_dir());
aft::watcher_filter::filter_watcher_raw_paths_for_test(
&config,
&ctx.shared_gitignore(),
raw_paths,
)
}
#[test]
fn callgraph_watcher_gate_covers_all_indexed_languages() {
use std::path::Path;
for ok in [
"Foo.java", "x.cpp", "y.c", "Svc.cs", "m.kt", "a.rb", "z.php", "s.scala", "C.sol",
"app.ts", "main.rs", "h.go", "p.py",
] {
assert!(
watcher_path_is_callgraph_indexed(Path::new(ok)),
"{ok} should be callgraph-indexed"
);
}
for skip in [
"notes.txt",
"image.png",
"Cargo.lock",
"data.csv",
"config.toml",
] {
assert!(
!watcher_path_is_callgraph_indexed(Path::new(skip)),
"{skip} should not be callgraph-indexed"
);
}
}
fn install_semantic_refresh_channels(
ctx: &AppContext,
) -> (
crossbeam_channel::Receiver<SemanticRefreshRequest>,
crossbeam_channel::Sender<SemanticRefreshEvent>,
) {
let (request_tx, request_rx) = crossbeam_channel::unbounded();
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let worker_slot: SemanticRefreshWorkerSlot =
std::sync::Arc::new(std::sync::Mutex::new(None));
ctx.install_semantic_refresh_worker(request_tx, event_rx, worker_slot);
(request_rx, event_tx)
}
fn transient_embedding_error() -> String {
format!(
"{}backend unavailable",
aft::semantic_index::TRANSIENT_EMBEDDING_MARKER
)
}
fn recv_files_request(
request_rx: &crossbeam_channel::Receiver<SemanticRefreshRequest>,
) -> Vec<std::path::PathBuf> {
match request_rx
.recv_timeout(RECV_DISPATCH_TIMEOUT)
.expect("semantic refresh request")
{
SemanticRefreshRequest::Files { paths } => paths,
SemanticRefreshRequest::Corpus => panic!("unexpected corpus refresh"),
}
}
fn status_frame_rx(ctx: &AppContext) -> std::sync::mpsc::Receiver<PushFrame> {
let (tx, rx) = std::sync::mpsc::channel();
ctx.set_progress_sender(Some(std::sync::Arc::new(Box::new(move |frame| {
let _ = tx.send(frame);
}))));
rx
}
fn recv_status_changed(rx: &std::sync::mpsc::Receiver<PushFrame>) -> serde_json::Value {
match rx
.recv_timeout(RECV_DISPATCH_TIMEOUT)
.expect("status_changed frame")
{
PushFrame::StatusChanged(frame) => frame.snapshot,
other => panic!("unexpected frame: {other:?}"),
}
}
fn err_diag(file: &std::path::Path) -> StoredDiagnostic {
StoredDiagnostic {
file: file.to_path_buf(),
line: 1,
column: 1,
end_line: 1,
end_column: 2,
severity: DiagnosticSeverity::Error,
message: "boom".into(),
code: None,
source: None,
}
}
fn semantic_breaker_test_lock() -> &'static std::sync::Mutex<()> {
use std::sync::{Mutex, OnceLock};
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
fn with_semantic_retry_backoff_ms<R>(ms: u64, f: impl FnOnce() -> R) -> R {
let _guard = semantic_breaker_test_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let previous = std::env::var_os("AFT_SEMANTIC_RETRY_BACKOFF_MS");
unsafe {
std::env::set_var("AFT_SEMANTIC_RETRY_BACKOFF_MS", ms.to_string());
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
unsafe {
match previous {
Some(value) => std::env::set_var("AFT_SEMANTIC_RETRY_BACKOFF_MS", value),
None => std::env::remove_var("AFT_SEMANTIC_RETRY_BACKOFF_MS"),
}
}
match result {
Ok(value) => value,
Err(payload) => std::panic::resume_unwind(payload),
}
}
fn with_neutralized_global_gitignore<R>(f: impl FnOnce() -> R) -> R {
use std::sync::{Mutex, OnceLock};
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
let _guard = LOCK
.get_or_init(|| Mutex::new(()))
.lock()
.unwrap_or_else(|e| e.into_inner());
let tmp = TempDir::new().unwrap();
let prev = std::env::var_os("XDG_CONFIG_HOME");
unsafe {
std::env::set_var("XDG_CONFIG_HOME", tmp.path());
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
unsafe {
match prev {
Some(v) => std::env::set_var("XDG_CONFIG_HOME", v),
None => std::env::remove_var("XDG_CONFIG_HOME"),
}
}
match result {
Ok(r) => r,
Err(p) => std::panic::resume_unwind(p),
}
}
#[test]
fn watcher_drain_refreshes_open_callgraph_store() {
let tmp = TempDir::new().unwrap();
let root = tmp.path();
let source = root.join("main.ts");
std::fs::write(
&source,
"export function entry() { oldLeaf(); }\nfunction oldLeaf() {}\nfunction newLeaf() {}\n",
)
.unwrap();
let ctx = AppContext::new(
Box::new(TreeSitterProvider::new()),
Config {
project_root: Some(root.to_path_buf()),
storage_dir: Some(root.join("storage")),
callgraph_store: true,
..Config::default()
},
);
ctx.set_harness(Harness::Opencode);
ctx.set_canonical_cache_root(root.to_path_buf());
ctx.set_cache_role(false, None);
ctx.rebuild_gitignore();
let tx = install_watcher_rx(&ctx);
{
let store = ctx
.ensure_callgraph_store()
.unwrap()
.expect("store should build on demand");
let tree = store
.call_tree(std::path::Path::new("main.ts"), "entry", 1)
.unwrap();
assert_eq!(tree.children[0].name, "oldLeaf");
}
std::fs::write(
&source,
"export function entry() { newLeaf(); }\nfunction oldLeaf() {}\nfunction newLeaf() {}\n",
)
.unwrap();
let mut paths = vec![source];
paths.extend((1..WATCHER_BATCH_INLINE_CAP).map(|i| root.join(format!("note-{i}.txt"))));
tx.send(WatcherDispatchEvent::Paths(paths)).unwrap();
drain_watcher_events(&ctx);
let store = {
let guard = ctx
.callgraph_store()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard
.as_ref()
.map(std::sync::Arc::clone)
.expect("store remains open")
};
let tree = store
.call_tree(std::path::Path::new("main.ts"), "entry", 1)
.unwrap();
assert_eq!(tree.children[0].name, "newLeaf");
}
#[test]
fn create_and_remove_invalidate() {
assert!(watcher_event_invalidates(&EventKind::Create(
CreateKind::File
)));
assert!(watcher_event_invalidates(&EventKind::Remove(
RemoveKind::File
)));
}
#[test]
fn modify_data_invalidates() {
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Data(DataChange::Content)
)));
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Data(DataChange::Any)
)));
}
#[test]
fn modify_name_rename_invalidates() {
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Name(RenameMode::To)
)));
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Name(RenameMode::From)
)));
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Name(RenameMode::Both)
)));
}
#[test]
fn modify_metadata_writetime_invalidates() {
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::WriteTime)
)));
}
#[test]
fn modify_metadata_any_or_other_invalidates() {
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::Any)
)));
assert!(watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::Other)
)));
}
#[test]
fn modify_metadata_access_time_does_not_invalidate() {
assert!(!watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::AccessTime)
)));
}
#[test]
fn modify_metadata_permissions_ownership_extended_do_not_invalidate() {
assert!(!watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::Permissions)
)));
assert!(!watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::Ownership)
)));
assert!(!watcher_event_invalidates(&EventKind::Modify(
ModifyKind::Metadata(MetadataKind::Extended)
)));
}
#[test]
fn access_events_do_not_invalidate() {
assert!(!watcher_event_invalidates(&EventKind::Access(
AccessKind::Open(AccessMode::Read)
)));
assert!(!watcher_event_invalidates(&EventKind::Access(
AccessKind::Read
)));
assert!(!watcher_event_invalidates(&EventKind::Access(
AccessKind::Close(AccessMode::Read)
)));
}
#[test]
fn other_event_kinds_do_not_invalidate() {
assert!(!watcher_event_invalidates(&EventKind::Other));
assert!(!watcher_event_invalidates(&EventKind::Any));
}
#[test]
fn dispatch_panic_response_is_clear_internal_error() {
let payload: Box<dyn std::any::Any + Send> = Box::new("boom");
let response = dispatch_panic_response("panic-id", "db_get_state", payload.as_ref());
assert!(!response.success);
assert_eq!(response.data["code"], "internal_error");
assert!(response.data["message"]
.as_str()
.unwrap()
.contains("command 'db_get_state' panicked: boom"));
}
#[test]
fn dispatch_rejects_agent_safety_commands_when_backups_disabled() {
let temp = TempDir::new().unwrap();
let mut config = Config {
project_root: Some(temp.path().to_path_buf()),
..Config::default()
};
config.backup = BackupConfig {
enabled: Some(false),
..BackupConfig::default()
};
let ctx = AppContext::new(Box::new(TreeSitterProvider::new()), config);
for command in [
"undo",
"undo_preview",
"edit_history",
"checkpoint",
"checkpoint_paths",
"restore_checkpoint",
"list_checkpoints",
] {
let response = dispatch(
RawRequest {
id: command.to_string(),
command: command.to_string(),
params: serde_json::json!({}),
session_id: None,
lsp_hints: None,
},
&ctx,
);
assert!(!response.success, "{command} unexpectedly succeeded");
assert_eq!(response.data["code"], "backups_disabled");
}
}
#[test]
fn push_frame_write_error_requests_shutdown() {
struct BrokenWriter;
impl std::io::Write for BrokenWriter {
fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"stdout closed",
))
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
let shutdown = std::sync::atomic::AtomicBool::new(false);
let frame =
PushFrame::ConfigureWarnings(ConfigureWarningsFrame::new("/repo", 0, Vec::new()));
write_push_frame_or_request_shutdown(&mut BrokenWriter, &frame, &shutdown);
assert!(shutdown.load(std::sync::atomic::Ordering::SeqCst));
}
#[test]
fn status_bar_attach_skips_unchanged_fingerprint() {
let tmp = TempDir::new().unwrap();
let ctx = make_ctx_with_root(tmp.path());
ctx.update_status_bar_tier2(Some(1), Some(2), Some(3), Some(4), false);
let mut first = Response::success("one", serde_json::json!({}));
attach_status_bar(&mut first, &ctx, "read");
assert_eq!(first.data["status_bar"]["dead_code"], 1);
assert_eq!(first.data["status_bar"]["unused_exports"], 2);
assert_eq!(first.data["status_bar"]["duplicates"], 3);
assert_eq!(first.data["status_bar"]["todos"], 4);
let mut unchanged = Response::success("two", serde_json::json!({}));
attach_status_bar(&mut unchanged, &ctx, "read");
assert!(unchanged.data.get("status_bar").is_none());
ctx.update_status_bar_tier2(Some(5), Some(2), Some(3), Some(4), false);
let mut changed = Response::success("three", serde_json::json!({}));
attach_status_bar(&mut changed, &ctx, "read");
assert_eq!(changed.data["status_bar"]["dead_code"], 5);
}
#[test]
fn configure_warning_drain_drops_stale_generation() {
let tmp = TempDir::new().unwrap();
let ctx = make_ctx_with_root(tmp.path());
let (frame_tx, frame_rx) = std::sync::mpsc::channel();
ctx.set_progress_sender(Some(std::sync::Arc::new(Box::new(move |frame| {
let _ = frame_tx.send(frame);
}))));
let warnings_tx = ctx.configure_warnings_sender();
let current_generation = ctx.advance_configure_generation();
warnings_tx
.send((
current_generation - 1,
ConfigureWarningsFrame::new("/stale", 1, Vec::new()),
))
.unwrap();
warnings_tx
.send((
current_generation,
ConfigureWarningsFrame::new("/current", 2, Vec::new()),
))
.unwrap();
aft::runtime_drain::drain_configure_warning_events(&ctx);
let frame = frame_rx.try_recv().expect("current warning frame");
match frame {
PushFrame::ConfigureWarnings(frame) => {
assert_eq!(frame.project_root, "/current");
assert_eq!(frame.source_file_count, 2);
}
other => panic!("unexpected frame: {other:?}"),
}
assert!(frame_rx.try_recv().is_err());
}
#[test]
fn gitignore_write_rebuilds_before_filtering_same_batch_paths() {
let tmp = TempDir::new().unwrap();
let root = tmp.path();
let gitignore = root.join(".gitignore");
let ignored = root.join("foo.txt");
let kept = root.join("bar.txt");
std::fs::write(&ignored, "ignored").unwrap();
std::fs::write(&kept, "kept").unwrap();
let ctx = make_ctx_with_root(root);
with_neutralized_global_gitignore(|| ctx.rebuild_gitignore());
assert!(ctx.gitignore().is_none());
std::fs::write(&gitignore, "foo.txt\n").unwrap();
with_neutralized_global_gitignore(|| ctx.rebuild_gitignore());
let changed =
filter_watcher_raw_paths(&ctx, vec![gitignore.clone(), ignored.clone(), kept.clone()]);
let gitignore = std::fs::canonicalize(gitignore).unwrap();
let ignored = std::fs::canonicalize(ignored).unwrap();
let kept = std::fs::canonicalize(kept).unwrap();
assert!(changed.ignore_file_changed);
assert!(changed.changed.contains(&gitignore));
assert!(!changed.changed.contains(&ignored));
assert!(changed.changed.contains(&kept));
}
#[test]
fn infra_ignore_file_does_not_request_corpus_refresh_but_project_aftignore_does() {
let tmp = TempDir::new().unwrap();
let root = tmp.path();
let infra_gitignore = root.join("node_modules").join("pkg").join(".gitignore");
std::fs::create_dir_all(infra_gitignore.parent().unwrap()).unwrap();
std::fs::write(&infra_gitignore, "dist/\n").unwrap();
let ctx = make_ctx_with_root(root);
let changed = filter_watcher_raw_paths(&ctx, vec![infra_gitignore]);
assert!(!changed.ignore_file_changed);
assert!(changed.changed.is_empty());
let aftignore = root.join(".aftignore");
std::fs::write(&aftignore, "ignored/\n").unwrap();
let changed = filter_watcher_raw_paths(&ctx, vec![aftignore.clone()]);
let aftignore = std::fs::canonicalize(aftignore).unwrap();
assert!(changed.ignore_file_changed);
assert!(changed.changed.contains(&aftignore));
}
#[test]
fn project_git_info_exclude_requests_corpus_refresh_without_indexing_git_dir() {
let tmp = TempDir::new().unwrap();
let root = tmp.path();
let git_info = root.join(".git").join("info");
std::fs::create_dir_all(&git_info).unwrap();
let exclude = git_info.join("exclude");
std::fs::write(&exclude, "ignored/\n").unwrap();
let ctx = make_ctx_with_root(root);
let changed = filter_watcher_raw_paths(&ctx, vec![exclude]);
assert!(changed.ignore_file_changed);
assert!(changed.changed.is_empty());
}
#[test]
fn shared_git_info_exclude_requests_corpus_refresh_without_indexing_external_file() {
let tmp = TempDir::new().unwrap();
let root = tmp.path();
let common = TempDir::new().unwrap();
let git_info = common.path().join("info");
std::fs::create_dir_all(&git_info).unwrap();
let exclude = git_info.join("exclude");
std::fs::write(&exclude, "ignored/\n").unwrap();
let ctx = make_ctx_with_root(root);
ctx.set_cache_role(false, Some(common.path().to_path_buf()));
let changed = filter_watcher_raw_paths(&ctx, vec![exclude]);
assert!(changed.ignore_file_changed);
assert!(changed.changed.is_empty());
}
#[test]
fn semantic_index_disconnect_without_terminal_event_marks_failed() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let ctx = make_ctx_with_root(&root);
let (tx, rx) = crossbeam_channel::unbounded::<SemanticIndexEvent>();
*ctx.semantic_index_rx().lock() = Some(rx);
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::Building {
stage: "embedding".into(),
files: Some(1),
entries_done: Some(0),
entries_total: Some(1),
};
drop(tx);
aft::runtime_drain::drain_semantic_index_events(&ctx);
assert!(ctx.semantic_index_rx().lock().is_none());
assert!(matches!(
&*ctx.semantic_index_status()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner),
SemanticIndexStatus::Failed(message)
if message.contains("disconnected before reporting completion")
));
}
#[test]
fn semantic_refresh_disconnect_after_started_cancels_refreshing_paths() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("lib.rs");
std::fs::write(&file, "fn main() {}\n").unwrap();
let ctx = make_ctx_with_root(&root);
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::ready();
let (_request_rx, event_tx) = install_semantic_refresh_channels(&ctx);
event_tx
.send(SemanticRefreshEvent::Started {
paths: vec![file.clone()],
})
.unwrap();
drop(event_tx);
drain_semantic_refresh_events(&ctx);
assert!(ctx.semantic_refresh_event_rx().lock().is_none());
assert_eq!(
ctx.semantic_index_status()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.refreshing_count(),
0
);
}
#[test]
fn watcher_overflow_rescan_reschedules_callgraph_store_instead_of_blocking() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
std::fs::write(root.join("lib.rs"), "fn used() {}\nfn main() { used(); }\n").unwrap();
let ctx = AppContext::new(
Box::new(TreeSitterProvider::new()),
Config {
project_root: Some(root.clone()),
storage_dir: Some(tmp.path().join("storage")),
callgraph_store: true,
..Config::default()
},
);
ctx.set_canonical_cache_root(root.clone());
ctx.rebuild_gitignore();
let resident = ctx
.ensure_callgraph_store()
.expect("ensure callgraph store");
assert!(resident.is_some(), "callgraph store should be resident");
drop(resident);
assert!(ctx
.callgraph_store()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_some());
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx
.send(WatcherDispatchEvent::RescanRequired)
.unwrap();
drain_watcher_events(&ctx);
assert!(
ctx.callgraph_store()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_none(),
"watcher overflow must drop the resident store to reschedule, not refresh inline"
);
assert!(
ctx.callgraph_store_rx().lock().is_none(),
"drain must not start a synchronous/inline callgraph build"
);
}
#[test]
fn watcher_large_batch_reschedules_indexes_instead_of_inline_refresh() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
std::fs::write(
root.join("main.ts"),
"export function entry() { oldLeaf(); }\nfunction oldLeaf() {}\n",
)
.unwrap();
let ctx = AppContext::new(
Box::new(TreeSitterProvider::new()),
Config {
project_root: Some(root.clone()),
storage_dir: Some(tmp.path().join("storage")),
callgraph_store: true,
search_index: true,
semantic_search: true,
..Config::default()
},
);
ctx.set_canonical_cache_root(root.clone());
ctx.rebuild_gitignore();
let resident = ctx
.ensure_callgraph_store()
.expect("ensure callgraph store")
.expect("callgraph store should build on demand");
drop(resident);
assert!(ctx
.callgraph_store()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_some());
let mut search_index = aft::search_index::SearchIndex::new();
search_index.ready = true;
*ctx.search_index()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(search_index);
*ctx.semantic_index()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) =
Some(SemanticIndex::new(root.clone(), 3));
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::ready();
let (request_rx, _event_tx) = install_semantic_refresh_channels(&ctx);
let watcher_tx = install_watcher_rx(&ctx);
let paths = (0..=WATCHER_BATCH_INLINE_CAP)
.map(|i| root.join(format!("changed-{i}.ts")))
.collect::<Vec<_>>();
watcher_tx.send(WatcherDispatchEvent::Paths(paths)).unwrap();
drain_watcher_events(&ctx);
assert!(
ctx.callgraph_store()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_none(),
"oversized watcher batch must drop the resident store instead of refreshing inline"
);
assert!(
ctx.callgraph_store_rx().lock().is_none(),
"drain must not start a callgraph cold build on the dispatch thread"
);
assert!(
matches!(
ctx.callgraph_store_for_ops(),
CallgraphStoreAccess::Building
),
"next callgraph op should start the forced background rebuild and return Building"
);
assert!(
ctx.search_index_rx()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_some(),
"oversized watcher batch should spawn a background search corpus refresh"
);
assert!(
!ctx.search_index()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.expect("resident search index")
.ready,
"resident search index should be marked not-ready while the corpus refresh runs"
);
match request_rx
.recv_timeout(RECV_DISPATCH_TIMEOUT)
.expect("semantic corpus refresh request")
{
SemanticRefreshRequest::Corpus => {}
SemanticRefreshRequest::Files { paths } => {
panic!("expected semantic corpus refresh for oversized batch, got {paths:?}")
}
}
assert!(request_rx
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
#[test]
fn watcher_error_clears_receiver_and_marks_degraded() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let ctx = make_ctx_with_root(&root);
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx
.send(WatcherDispatchEvent::Error(
"watcher init failed".to_string(),
))
.unwrap();
drain_watcher_events(&ctx);
assert!(ctx.watcher_rx().lock().is_none());
assert!(ctx
.degraded_reasons()
.contains(&"watcher_unavailable".to_string()));
let status = ctx.build_status_snapshot();
assert!(status["degraded"].as_bool().unwrap_or(false));
assert_eq!(
status["degraded_reasons"],
serde_json::json!(["watcher_unavailable"])
);
}
#[test]
fn project_root_deleted_control_drops_watcher_rx_and_marks_degraded() {
let tmp = TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let ctx = make_ctx_with_root(&root);
ctx.set_canonical_cache_root(root);
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx.send(WatcherDispatchEvent::RootDeleted).unwrap();
drain_watcher_events(&ctx);
assert!(ctx.watcher_rx().lock().is_none());
assert!(ctx
.degraded_reasons()
.contains(&"project_root_deleted".to_string()));
}
#[test]
fn watcher_semantic_refresh_includes_vue_extension() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("App.vue");
std::fs::write(&file, "<script setup>const n = 1;</script>").unwrap();
let ctx = make_ctx_with_root(&root);
*ctx.semantic_index()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) =
Some(SemanticIndex::new(root.clone(), 3));
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::ready();
let (request_rx, _event_tx) = install_semantic_refresh_channels(&ctx);
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx.send(watcher_paths_event(file.clone())).unwrap();
drain_watcher_events(&ctx);
match request_rx
.recv_timeout(RECV_DISPATCH_TIMEOUT)
.expect("semantic refresh request")
{
SemanticRefreshRequest::Files { paths } => assert_eq!(paths, vec![file]),
SemanticRefreshRequest::Corpus => panic!("unexpected corpus refresh"),
}
}
#[test]
fn watcher_rescan_required_coalesces_and_requests_one_corpus_refresh() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("lib.rs");
std::fs::write(&file, "fn main() {}\n").unwrap();
let ctx = AppContext::new(
Box::new(TreeSitterProvider::new()),
Config {
project_root: Some(root.clone()),
semantic_search: true,
..Config::default()
},
);
ctx.set_canonical_cache_root(root.clone());
ctx.rebuild_gitignore();
*ctx.semantic_index()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(SemanticIndex::new(root, 3));
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::ready();
let (request_rx, _event_tx) = install_semantic_refresh_channels(&ctx);
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx.send(watcher_paths_event(file.clone())).unwrap();
watcher_tx
.send(WatcherDispatchEvent::RescanRequired)
.unwrap();
watcher_tx
.send(WatcherDispatchEvent::RescanRequired)
.unwrap();
watcher_tx.send(watcher_paths_event(file)).unwrap();
drain_watcher_events(&ctx);
match request_rx
.recv_timeout(RECV_DISPATCH_TIMEOUT)
.expect("semantic corpus refresh request")
{
SemanticRefreshRequest::Corpus => {}
SemanticRefreshRequest::Files { paths } => {
panic!("expected corpus refresh, got file refresh for {paths:?}")
}
}
assert!(request_rx
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
#[test]
fn transient_file_refresh_failure_requeues_retry_without_completing() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("lib.rs");
std::fs::write(&file, "fn main() {}").unwrap();
with_semantic_retry_backoff_ms(1, || {
let ctx = make_ctx_with_root(&root);
let (request_rx, event_tx) = install_semantic_refresh_channels(&ctx);
let mut status = SemanticIndexStatus::ready();
status.add_refreshing_file(file.clone());
status.start_refreshing_file(file.clone());
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = status;
event_tx
.send(SemanticRefreshEvent::Failed {
paths: vec![file.clone()],
error: format!(
"{}backend unavailable",
aft::semantic_index::TRANSIENT_EMBEDDING_MARKER
),
})
.unwrap();
drain_semantic_refresh_events(&ctx);
match request_rx
.recv_timeout(RECV_DISPATCH_TIMEOUT)
.expect("retry request")
{
SemanticRefreshRequest::Files { paths } => assert_eq!(paths, vec![file.clone()]),
SemanticRefreshRequest::Corpus => panic!("unexpected corpus refresh"),
}
assert_eq!(
ctx.semantic_index_status()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.refreshing_count(),
1
);
});
}
#[test]
fn semantic_refresh_breaker_coalesces_open_retries_into_single_probe() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let files = (0..(BREAKER_TRIP_THRESHOLD + 2))
.map(|index| root.join(format!("file{index}.rs")))
.collect::<Vec<_>>();
for file in &files {
std::fs::write(file, "fn main() {}\n").unwrap();
}
with_semantic_retry_backoff_ms(1, || {
let ctx = make_ctx_with_root(&root);
let (request_rx, event_tx) = install_semantic_refresh_channels(&ctx);
for file in files.iter().take(BREAKER_TRIP_THRESHOLD) {
event_tx
.send(SemanticRefreshEvent::Failed {
paths: vec![file.clone()],
error: transient_embedding_error(),
})
.unwrap();
}
drain_semantic_refresh_events(&ctx);
assert!(semantic_refresh_circuit_is_open(&ctx));
assert!(semantic_refresh_probe_is_scheduled_for_test(&ctx));
for file in files.iter().skip(BREAKER_TRIP_THRESHOLD) {
event_tx
.send(SemanticRefreshEvent::Failed {
paths: vec![file.clone()],
error: transient_embedding_error(),
})
.unwrap();
}
drain_semantic_refresh_events(&ctx);
assert!(semantic_refresh_probe_is_scheduled_for_test(&ctx));
let expected_probe_paths = files[(BREAKER_TRIP_THRESHOLD - 1)..].to_vec();
let pending = ctx.take_pending_semantic_index_paths();
assert_eq!(pending, expected_probe_paths);
ctx.add_pending_semantic_index_paths(pending);
std::thread::sleep(std::time::Duration::from_millis(20));
drain_semantic_refresh_events(&ctx);
let mut batches = Vec::new();
for _ in 0..BREAKER_TRIP_THRESHOLD {
batches.push(recv_files_request(&request_rx));
}
for file in files.iter().take(BREAKER_TRIP_THRESHOLD - 1) {
assert!(batches.contains(&vec![file.clone()]));
}
assert!(batches.contains(&expected_probe_paths));
assert!(request_rx
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
});
}
#[test]
fn semantic_refresh_circuit_breaker_is_per_app_context() {
let root_a = TempDir::new().unwrap();
let root_b = TempDir::new().unwrap();
let ctx_a = make_ctx_with_root(root_a.path());
let ctx_b = make_ctx_with_root(root_b.path());
for _ in 0..BREAKER_TRIP_THRESHOLD {
record_semantic_refresh_transient_failure(&ctx_a);
}
assert!(semantic_refresh_circuit_is_open(&ctx_a));
assert_eq!(
semantic_refresh_transient_failure_count_for_test(&ctx_a),
BREAKER_TRIP_THRESHOLD
);
assert!(!semantic_refresh_circuit_is_open(&ctx_b));
assert_eq!(semantic_refresh_transient_failure_count_for_test(&ctx_b), 0);
assert!(!semantic_refresh_probe_is_scheduled_for_test(&ctx_b));
}
#[test]
fn semantic_refresh_success_resets_breaker_and_next_failure_starts_fresh() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let files = (0..(BREAKER_TRIP_THRESHOLD + 1))
.map(|index| root.join(format!("reset{index}.rs")))
.collect::<Vec<_>>();
for file in &files {
std::fs::write(file, "fn main() {}\n").unwrap();
}
with_semantic_retry_backoff_ms(1, || {
let ctx = make_ctx_with_root(&root);
let (request_rx, event_tx) = install_semantic_refresh_channels(&ctx);
for file in files.iter().take(BREAKER_TRIP_THRESHOLD) {
event_tx
.send(SemanticRefreshEvent::Failed {
paths: vec![file.clone()],
error: transient_embedding_error(),
})
.unwrap();
}
drain_semantic_refresh_events(&ctx);
assert!(semantic_refresh_circuit_is_open(&ctx));
let mut retry_batches = Vec::new();
for _ in 0..(BREAKER_TRIP_THRESHOLD - 1) {
retry_batches.push(recv_files_request(&request_rx));
}
for file in files.iter().take(BREAKER_TRIP_THRESHOLD - 1) {
assert!(
retry_batches.contains(&vec![file.clone()]),
"missing retry for {file:?}; got {retry_batches:?}"
);
}
event_tx
.send(SemanticRefreshEvent::Completed {
added_entries: Vec::new(),
updated_metadata: Vec::new(),
completed_paths: vec![files[BREAKER_TRIP_THRESHOLD - 1].clone()],
})
.unwrap();
drain_semantic_refresh_events(&ctx);
assert!(!semantic_refresh_circuit_is_open(&ctx));
assert_eq!(semantic_refresh_transient_failure_count_for_test(&ctx), 0);
let fresh_file = files[BREAKER_TRIP_THRESHOLD].clone();
event_tx
.send(SemanticRefreshEvent::Failed {
paths: vec![fresh_file.clone()],
error: transient_embedding_error(),
})
.unwrap();
drain_semantic_refresh_events(&ctx);
assert!(!semantic_refresh_circuit_is_open(&ctx));
assert_eq!(semantic_refresh_transient_failure_count_for_test(&ctx), 1);
assert_eq!(recv_files_request(&request_rx), vec![fresh_file]);
});
}
#[test]
fn semantic_refresh_retry_attempt_cap_stashes_path_without_hot_timer() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("cap.rs");
std::fs::write(&file, "fn main() {}\n").unwrap();
with_semantic_retry_backoff_ms(1, || {
let ctx = make_ctx_with_root(&root);
let (request_rx, _event_tx) = install_semantic_refresh_channels(&ctx);
let error = transient_embedding_error();
for _ in 0..MAX_RETRY_ATTEMPTS {
assert!(schedule_semantic_refresh_retry(
&ctx,
vec![file.clone()],
&error
));
assert_eq!(recv_files_request(&request_rx), vec![file.clone()]);
}
assert!(schedule_semantic_refresh_retry(
&ctx,
vec![file.clone()],
&error
));
assert!(request_rx
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
assert_eq!(ctx.take_pending_semantic_index_paths(), vec![file]);
});
}
#[test]
fn transient_corpus_failure_preserves_pending_semantic_paths() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("pending.vue");
std::fs::write(&file, "<template />").unwrap();
let ctx = make_ctx_with_root(&root);
*ctx.semantic_index()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) =
Some(SemanticIndex::new(root.clone(), 3));
*ctx.semantic_index_status()
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::Building {
stage: "refreshing_corpus".into(),
files: Some(1),
entries_done: None,
entries_total: None,
};
ctx.add_pending_semantic_index_paths(vec![file.clone()]);
let (_request_rx, event_tx) = install_semantic_refresh_channels(&ctx);
event_tx
.send(SemanticRefreshEvent::CorpusFailed {
error: format!(
"{}backend unavailable",
aft::semantic_index::TRANSIENT_EMBEDDING_MARKER
),
})
.unwrap();
drain_semantic_refresh_events(&ctx);
let pending = ctx.take_pending_semantic_index_paths();
assert_eq!(pending, vec![file]);
assert!(matches!(
&*ctx
.semantic_index_status()
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner),
SemanticIndexStatus::Ready { .. }
));
}
#[test]
fn watcher_stale_mark_emits_status_without_semantic_change() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("notes.txt");
std::fs::write(&file, "changed").unwrap();
let ctx = make_ctx_with_root(&root);
ctx.update_status_bar_tier2(Some(1), Some(2), Some(3), Some(4), false);
let rx = status_frame_rx(&ctx);
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx.send(watcher_paths_event(file)).unwrap();
drain_watcher_events(&ctx);
let snapshot = recv_status_changed(&rx);
assert_eq!(
snapshot["status_bar"]["tier2_stale"],
serde_json::Value::Bool(true)
);
}
#[test]
fn watcher_diagnostics_clear_emits_status_without_semantic_change() {
let tmp = TempDir::new().unwrap();
let root = std::fs::canonicalize(tmp.path()).unwrap();
let file = root.join("gone.txt");
std::fs::write(&file, "deleted").unwrap();
let ctx = make_ctx_with_root(&root);
ctx.update_status_bar_tier2(Some(1), Some(2), Some(3), Some(4), true);
{
let key = ServerKey {
kind: ServerKind::TypeScript,
root: root.clone(),
};
let mut lsp = ctx.lsp();
lsp.diagnostics_store_mut_for_test()
.publish(key, file.clone(), vec![err_diag(&file)]);
}
assert_eq!(ctx.status_bar_counts().unwrap().errors, 1);
std::fs::remove_file(&file).unwrap();
let rx = status_frame_rx(&ctx);
let watcher_tx = install_watcher_rx(&ctx);
watcher_tx.send(watcher_paths_event(file)).unwrap();
drain_watcher_events(&ctx);
let snapshot = recv_status_changed(&rx);
assert_eq!(snapshot["status_bar"]["errors"], serde_json::Value::from(0));
assert_eq!(ctx.status_bar_counts().unwrap().errors, 0);
}
}