agent-file-tools 0.28.2

Agent File Tools — tree-sitter powered code analysis for AI agents
Documentation
#![cfg(unix)]

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use aft::bash_background::persistence::{read_task, task_paths, PersistedTask};
use aft::bash_background::{BgTaskRegistry, BgTaskStatus};
use aft::db::bash_tasks::{upsert_bash_task, BashTaskRow};
use aft::harness::Harness;
use rusqlite::Connection;

const SESSION: &str = "dual-write-session";

#[derive(Debug)]
struct DbTaskRow {
    harness: String,
    session_id: String,
    task_id: String,
    project_key: String,
    command: String,
    cwd: String,
    status: String,
    exit_code: Option<i32>,
    pid: Option<i64>,
    pgid: Option<i64>,
    started_at: i64,
    completed_at: Option<i64>,
    stdout_path: Option<String>,
    stderr_path: Option<String>,
    compressed: bool,
    timeout_ms: Option<i64>,
    completion_delivered: bool,
    output_bytes: Option<i64>,
    metadata: String,
}

fn registry_with_db(storage: &Path, harness: Harness) -> (BgTaskRegistry, Arc<Mutex<Connection>>) {
    let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
    registry.set_harness(harness);
    let conn = aft::db::open(&storage.join("aft.db")).expect("open test DB");
    let shared = Arc::new(Mutex::new(conn));
    registry.set_db_pool(shared.clone());
    (registry, shared)
}

fn spawn_task(registry: &BgTaskRegistry, storage: &Path, project: &Path, command: &str) -> String {
    registry
        .spawn(
            command,
            SESSION.to_string(),
            project.to_path_buf(),
            HashMap::new(),
            Some(Duration::from_secs(30)),
            storage.to_path_buf(),
            16,
            true,
            true,
            Some(project.to_path_buf()),
        )
        .expect("spawn background task")
}

fn wait_for_status(
    conn: &Arc<Mutex<Connection>>,
    harness: &str,
    session_id: &str,
    task_id: &str,
    expected: &str,
) -> DbTaskRow {
    let started = Instant::now();
    loop {
        if let Some(row) = fetch_row(conn, harness, session_id, task_id) {
            if row.status == expected {
                return row;
            }
        }
        assert!(
            started.elapsed() < Duration::from_secs(8),
            "timed out waiting for DB status {expected}"
        );
        std::thread::sleep(Duration::from_millis(50));
    }
}

fn wait_for_json_status(storage: &Path, session_id: &str, task_id: &str, expected: BgTaskStatus) {
    let paths = task_paths(storage, session_id, task_id);
    let started = Instant::now();
    loop {
        if let Ok(task) = read_task(&paths.json) {
            if task.status == expected {
                return;
            }
        }
        assert!(
            started.elapsed() < Duration::from_secs(8),
            "timed out waiting for JSON status {expected:?}"
        );
        std::thread::sleep(Duration::from_millis(50));
    }
}

fn fetch_row(
    conn: &Arc<Mutex<Connection>>,
    harness: &str,
    session_id: &str,
    task_id: &str,
) -> Option<DbTaskRow> {
    let conn = conn.lock().expect("DB lock");
    conn.query_row(
        "SELECT harness, session_id, task_id, project_key, command, cwd, status,
                exit_code, pid, pgid, started_at, completed_at, stdout_path, stderr_path,
                compressed, timeout_ms, completion_delivered, output_bytes, metadata
         FROM bash_tasks
         WHERE harness = ?1 AND session_id = ?2 AND task_id = ?3",
        rusqlite::params![harness, session_id, task_id],
        |row| {
            Ok(DbTaskRow {
                harness: row.get(0)?,
                session_id: row.get(1)?,
                task_id: row.get(2)?,
                project_key: row.get(3)?,
                command: row.get(4)?,
                cwd: row.get(5)?,
                status: row.get(6)?,
                exit_code: row.get(7)?,
                pid: row.get(8)?,
                pgid: row.get(9)?,
                started_at: row.get(10)?,
                completed_at: row.get(11)?,
                stdout_path: row.get(12)?,
                stderr_path: row.get(13)?,
                compressed: row.get::<_, i64>(14)? != 0,
                timeout_ms: row.get(15)?,
                completion_delivered: row.get::<_, i64>(16)? != 0,
                output_bytes: row.get(17)?,
                metadata: row.get(18)?,
            })
        },
    )
    .ok()
}

fn assert_row_matches_task(row: &DbTaskRow, task: &PersistedTask, project: &Path, storage: &Path) {
    let paths = task_paths(storage, &task.session_id, &task.task_id);
    assert_eq!(row.harness, "opencode");
    assert_eq!(row.session_id, task.session_id);
    assert_eq!(row.task_id, task.task_id);
    assert_eq!(
        row.project_key,
        aft::search_index::project_cache_key(project)
    );
    assert_eq!(row.command, task.command);
    assert_eq!(row.cwd, task.workdir.display().to_string());
    assert_eq!(row.exit_code, task.exit_code);
    assert_eq!(row.pid, task.child_pid.map(i64::from));
    assert_eq!(row.pgid, task.pgid.map(i64::from));
    assert_eq!(row.started_at, task.started_at as i64);
    assert_eq!(row.completed_at, task.finished_at.map(|value| value as i64));
    assert_eq!(
        row.stdout_path.as_deref(),
        Some(paths.stdout.to_str().unwrap())
    );
    assert_eq!(
        row.stderr_path.as_deref(),
        Some(paths.stderr.to_str().unwrap())
    );
    assert_eq!(row.compressed, task.compressed);
    assert_eq!(row.timeout_ms, task.timeout_ms.map(|value| value as i64));
    assert_eq!(row.completion_delivered, task.completion_delivered);
    assert_eq!(
        serde_json::from_str::<serde_json::Value>(&row.metadata).unwrap(),
        serde_json::to_value(task).unwrap()
    );
}

#[test]
fn bash_tasks_dual_write_spawn_writes_both_json_and_db_row() {
    let project = tempfile::tempdir().unwrap();
    let storage = tempfile::tempdir().unwrap();
    let (registry, conn) = registry_with_db(storage.path(), Harness::Opencode);

    let task_id = spawn_task(&registry, storage.path(), project.path(), "echo dual-write");
    let row = wait_for_status(&conn, "opencode", SESSION, &task_id, "completed");
    let task = read_task(&task_paths(storage.path(), SESSION, &task_id).json).unwrap();

    assert_row_matches_task(&row, &task, project.path(), storage.path());
    registry.detach();
}

#[test]
fn bash_tasks_dual_write_status_transitions_update_db_row() {
    let project = tempfile::tempdir().unwrap();
    let storage = tempfile::tempdir().unwrap();
    let (registry, conn) = registry_with_db(storage.path(), Harness::Opencode);

    let task_id = spawn_task(&registry, storage.path(), project.path(), "sleep 0.2");
    let running = wait_for_status(&conn, "opencode", SESSION, &task_id, "running");
    assert!(running.pid.is_some());
    assert!(running.completed_at.is_none());

    let completed = wait_for_status(&conn, "opencode", SESSION, &task_id, "completed");
    assert_eq!(completed.exit_code, Some(0));
    assert!(completed.pid.is_none());
    assert!(completed.completed_at.is_some());
    registry.detach();
}

#[test]
fn bash_tasks_dual_write_background_task_updates_under_watchdog() {
    let project = tempfile::tempdir().unwrap();
    let storage = tempfile::tempdir().unwrap();
    let (registry, conn) = registry_with_db(storage.path(), Harness::Opencode);

    let task_id = spawn_task(
        &registry,
        storage.path(),
        project.path(),
        "echo watchdog-done",
    );
    let completed = wait_for_status(&conn, "opencode", SESSION, &task_id, "completed");

    assert_eq!(completed.exit_code, Some(0));
    assert!(completed.output_bytes.unwrap_or_default() > 0);
    registry.detach();
}

#[test]
fn bash_tasks_dual_write_db_failure_does_not_break_json_write() {
    let project = tempfile::tempdir().unwrap();
    let storage = tempfile::tempdir().unwrap();
    let (registry, conn) = registry_with_db(storage.path(), Harness::Opencode);
    conn.lock()
        .unwrap()
        .execute("DROP TABLE bash_tasks", [])
        .unwrap();

    let task_id = spawn_task(
        &registry,
        storage.path(),
        project.path(),
        "echo json-survives",
    );
    wait_for_json_status(storage.path(), SESSION, &task_id, BgTaskStatus::Completed);

    assert!(task_paths(storage.path(), SESSION, &task_id).json.exists());
    registry.detach();
}

#[test]
fn bash_tasks_dual_write_output_bytes_propagate_into_db_row() {
    let project = tempfile::tempdir().unwrap();
    let storage = tempfile::tempdir().unwrap();
    let (registry, conn) = registry_with_db(storage.path(), Harness::Opencode);

    let task_id = spawn_task(
        &registry,
        storage.path(),
        project.path(),
        "printf known-output",
    );
    let completed = wait_for_status(&conn, "opencode", SESSION, &task_id, "completed");

    assert_eq!(completed.output_bytes, Some("known-output".len() as i64));
    registry.detach();
}

#[test]
fn bash_tasks_dual_write_upsert_replaces_not_duplicates() {
    let storage = tempfile::tempdir().unwrap();
    let conn = aft::db::open(&storage.path().join("aft.db")).unwrap();
    let mut row = direct_row("opencode", "session", "bash-dupe", "running");
    upsert_bash_task(&conn, &row).unwrap();
    row.status = "completed".to_string();
    row.exit_code = Some(0);
    upsert_bash_task(&conn, &row).unwrap();

    let (count, status): (i64, String) = conn
        .query_row(
            "SELECT COUNT(*), MAX(status) FROM bash_tasks WHERE harness = 'opencode' AND session_id = 'session' AND task_id = 'bash-dupe'",
            [],
            |row| Ok((row.get(0)?, row.get(1)?)),
        )
        .unwrap();
    assert_eq!(count, 1);
    assert_eq!(status, "completed");
}

#[test]
fn bash_tasks_dual_write_harness_isolation_in_db() {
    let storage = tempfile::tempdir().unwrap();
    let conn = aft::db::open(&storage.path().join("aft.db")).unwrap();

    upsert_bash_task(
        &conn,
        &direct_row("opencode", "session", "bash-shared", "running"),
    )
    .unwrap();
    upsert_bash_task(
        &conn,
        &direct_row("pi", "session", "bash-shared", "completed"),
    )
    .unwrap();

    let count: i64 = conn
        .query_row(
            "SELECT COUNT(*) FROM bash_tasks WHERE session_id = 'session' AND task_id = 'bash-shared'",
            [],
            |row| row.get(0),
        )
        .unwrap();
    assert_eq!(count, 2);
}

#[test]
fn bash_tasks_dual_write_disabled_db_pool_skips_dual_write() {
    let project = tempfile::tempdir().unwrap();
    let storage = tempfile::tempdir().unwrap();
    let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
    registry.set_harness(Harness::Opencode);

    let task_id = spawn_task(&registry, storage.path(), project.path(), "echo no-db");
    wait_for_json_status(storage.path(), SESSION, &task_id, BgTaskStatus::Completed);

    let conn = aft::db::open(&storage.path().join("aft.db")).unwrap();
    let count: i64 = conn
        .query_row("SELECT COUNT(*) FROM bash_tasks", [], |row| row.get(0))
        .unwrap();
    assert_eq!(count, 0);
    registry.detach();
}

fn direct_row(harness: &str, session_id: &str, task_id: &str, status: &str) -> BashTaskRow {
    BashTaskRow {
        harness: harness.to_string(),
        session_id: session_id.to_string(),
        task_id: task_id.to_string(),
        project_key: "project-key".to_string(),
        command: "echo ok".to_string(),
        cwd: PathBuf::from("/tmp").display().to_string(),
        status: status.to_string(),
        exit_code: None,
        pid: None,
        pgid: None,
        started_at: 1,
        completed_at: None,
        stdout_path: None,
        stderr_path: None,
        compressed: true,
        timeout_ms: None,
        completion_delivered: false,
        output_bytes: None,
        metadata: "{}".to_string(),
    }
}