ralph-agent-loop 0.4.0

A Rust CLI for managing AI agent loops with a structured JSON task queue
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
//! Queue lock handling tests for run command.

use super::{find_definitely_dead_pid, resolved_with_repo_root, task_with_status};
use crate::commands::run::run_session::create_session_for_task;
use crate::contracts::{QueueFile, TaskStatus};
use crate::queue;
use crate::testsupport::INTERRUPT_TEST_MUTEX;
use crate::testsupport::reset_ctrlc_interrupt_flag;
use std::sync::Mutex;

#[test]
fn run_one_with_id_locked_skips_reacquiring_queue_lock() -> anyhow::Result<()> {
    // Synchronize with tests that modify the interrupt flag.
    // Hold the mutex for the entire test to prevent any race conditions.
    let interrupt_mutex = INTERRUPT_TEST_MUTEX.get_or_init(|| Mutex::new(()));
    let _interrupt_guard = interrupt_mutex.lock().unwrap();
    reset_ctrlc_interrupt_flag();

    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    let resolved = resolved_with_repo_root(repo_root.clone());

    std::fs::create_dir_all(repo_root.join(".ralph"))?;
    let task = crate::contracts::Task {
        id: "RQ-0001".to_string(),
        status: TaskStatus::Done,
        title: "Test task".to_string(),
        description: None,
        priority: Default::default(),
        tags: vec!["rust".to_string()],
        scope: vec!["crates/ralph".to_string()],
        evidence: vec!["observed".to_string()],
        plan: vec!["do thing".to_string()],
        notes: vec![],
        request: Some("test request".to_string()),
        agent: None,
        created_at: Some("2026-01-18T00:00:00Z".to_string()),
        updated_at: Some("2026-01-18T00:00:00Z".to_string()),
        completed_at: Some("2026-01-18T01:00:00Z".to_string()),
        started_at: None,
        scheduled_start: None,
        depends_on: vec![],
        blocks: vec![],
        relates_to: vec![],
        duplicates: None,
        custom_fields: std::collections::HashMap::new(),
        estimated_minutes: None,
        actual_minutes: None,
        parent_id: None,
    };
    queue::save_queue(
        &resolved.queue_path,
        &QueueFile {
            version: 1,
            tasks: vec![task],
        },
    )?;

    let _lock = queue::acquire_queue_lock(&resolved.repo_root, "test lock", false)?;

    let err = crate::commands::run::run_one_with_id_locked(
        &resolved,
        &crate::commands::run::AgentOverrides::default(),
        false,
        "RQ-0001",
        crate::commands::run::RunOneResumeOptions::disabled(),
        None,
        None,
        None,
    )
    .expect_err("expected runnable status error");

    let query_err = err
        .downcast_ref::<crate::queue::operations::QueueQueryError>()
        .unwrap_or_else(|| panic!("expected QueueQueryError, got: {err:#}"));

    assert!(
        matches!(
            query_err,
            crate::queue::operations::QueueQueryError::TargetTaskNotRunnable {
                status: TaskStatus::Done,
                ..
            }
        ),
        "expected TargetTaskNotRunnable with Done status"
    );

    assert!(
        !crate::commands::run::queue_lock::is_queue_lock_already_held_error(&err),
        "expected not to fail due to queue-lock contention"
    );

    Ok(())
}

#[test]
fn clear_stale_queue_lock_for_resume_removes_stale_lock() -> anyhow::Result<()> {
    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    std::fs::create_dir_all(repo_root.join(".ralph"))?;

    let lock_dir = crate::lock::queue_lock_dir(&repo_root);
    std::fs::create_dir_all(&lock_dir)?;
    let owner_path = lock_dir.join("owner");

    let stale_pid = find_definitely_dead_pid();
    std::fs::write(
        &owner_path,
        format!(
            "pid: {stale_pid}\nstarted_at: 2026-02-06T00:56:29Z\ncommand: ralph run loop --max-tasks 0\nlabel: run one\n"
        ),
    )?;

    crate::commands::run::queue_lock::clear_stale_queue_lock_for_resume(&repo_root)?;

    assert!(
        !lock_dir.exists(),
        "expected stale queue lock dir to be cleared during resume"
    );

    Ok(())
}

#[test]
fn clear_stale_queue_lock_for_resume_does_not_remove_live_lock() -> anyhow::Result<()> {
    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    std::fs::create_dir_all(repo_root.join(".ralph"))?;

    let lock_dir = crate::lock::queue_lock_dir(&repo_root);
    let _held = queue::acquire_queue_lock(&repo_root, "live holder", false)?;

    crate::commands::run::queue_lock::clear_stale_queue_lock_for_resume(&repo_root)?;

    assert!(lock_dir.exists(), "expected live queue lock dir to remain");
    let owner = std::fs::read_to_string(lock_dir.join("owner"))?;
    assert!(
        owner.contains("live holder"),
        "expected lock owner label to be unchanged"
    );

    Ok(())
}

#[test]
fn inspect_queue_lock_reports_stale_lock_as_stale_operator_state() -> anyhow::Result<()> {
    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    std::fs::create_dir_all(repo_root.join(".ralph"))?;

    let lock_dir = crate::lock::queue_lock_dir(&repo_root);
    std::fs::create_dir_all(&lock_dir)?;
    let stale_pid = find_definitely_dead_pid();
    std::fs::write(
        lock_dir.join("owner"),
        format!(
            "pid: {stale_pid}\nstarted_at: 2026-02-06T00:56:29Z\ncommand: ralph run loop --parallel 4\nlabel: run loop\n"
        ),
    )?;

    let inspection = crate::commands::run::queue_lock::inspect_queue_lock(&repo_root)
        .expect("expected queue lock inspection");

    assert_eq!(
        inspection.condition,
        crate::commands::run::queue_lock::QueueLockCondition::Stale
    );
    assert!(matches!(
        &inspection.blocking_state.reason,
        crate::contracts::BlockingReason::LockBlocked { .. }
    ));
    assert!(
        inspection
            .blocking_state
            .message
            .contains("stale queue lock"),
        "expected stale-specific message, got: {}",
        inspection.blocking_state.message
    );
    assert!(
        inspection.blocking_state.detail.contains("--force"),
        "expected stale-specific recovery detail, got: {}",
        inspection.blocking_state.detail
    );

    Ok(())
}

#[test]
fn inspect_queue_lock_reports_pid_reuse_review_for_aged_live_owner() -> anyhow::Result<()> {
    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    std::fs::create_dir_all(repo_root.join(".ralph"))?;

    let lock_dir = crate::lock::queue_lock_dir(&repo_root);
    std::fs::create_dir_all(&lock_dir)?;
    std::fs::write(
        lock_dir.join("owner"),
        format!(
            "pid: {}\nstarted_at: 2020-01-01T00:00:00Z\ncommand: ralph run loop --parallel 4\nlabel: run loop\n",
            std::process::id()
        ),
    )?;

    let inspection = crate::commands::run::queue_lock::inspect_queue_lock(&repo_root)
        .expect("expected queue lock inspection");

    assert_eq!(
        inspection.condition,
        crate::commands::run::queue_lock::QueueLockCondition::Live
    );
    assert!(
        inspection.blocking_state.detail.contains("reused PID"),
        "expected PID-reuse review detail, got: {}",
        inspection.blocking_state.detail
    );
    assert!(
        inspection
            .blocking_state
            .detail
            .contains("does not auto-clear"),
        "expected conservative cleanup detail, got: {}",
        inspection.blocking_state.detail
    );

    Ok(())
}

#[test]
fn run_loop_auto_resume_clears_stale_queue_lock_before_task_execution() -> anyhow::Result<()> {
    use std::sync::atomic::Ordering;

    struct InterruptGuard {
        previous: bool,
    }

    impl Drop for InterruptGuard {
        fn drop(&mut self) {
            if let Ok(ctrlc) = crate::runner::ctrlc_state() {
                ctrlc.interrupted.store(self.previous, Ordering::SeqCst);
            }
        }
    }

    // Acquire lock to prevent other tests from running while we have the interrupt flag set.
    // This ensures tests that use the runner don't see the flag as true.
    let interrupt_mutex = INTERRUPT_TEST_MUTEX.get_or_init(|| Mutex::new(()));
    let _interrupt_guard = interrupt_mutex.lock().unwrap();

    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    std::fs::create_dir_all(repo_root.join(".ralph/cache"))?;

    let resolved = resolved_with_repo_root(repo_root.clone());

    // Valid resumable session: Doing task + session.jsonc present.
    queue::save_queue(
        &resolved.queue_path,
        &QueueFile {
            version: 1,
            tasks: vec![task_with_status(TaskStatus::Doing)],
        },
    )?;
    queue::save_queue(&resolved.done_path, &QueueFile::default())?;

    let session = create_session_for_task(
        "RQ-0001",
        &resolved,
        &crate::commands::run::AgentOverrides::default(),
        1,
        None,
    );
    crate::session::save_session(&repo_root.join(".ralph/cache"), &session)?;

    // Stale queue lock left behind by a dead process.
    let lock_dir = crate::lock::queue_lock_dir(&repo_root);
    std::fs::create_dir_all(&lock_dir)?;
    let stale_pid = find_definitely_dead_pid();
    std::fs::write(
        lock_dir.join("owner"),
        format!(
            "pid: {stale_pid}\nstarted_at: 2026-02-06T00:56:29Z\ncommand: ralph run loop --max-tasks 0\nlabel: run one\n"
        ),
    )?;

    // Prevent the loop from executing the task; we only care that the resume path
    // cleared the stale lock before attempting `run_one`.
    // NOTE: We set the interrupted flag to prevent the loop from actually running tasks.
    // This is a global state mutation that must be restored even if the test panics.
    // We set the flag as close to the run_loop call as possible to minimize interference
    // with other tests that may be running in parallel.
    let ctrlc =
        crate::runner::ctrlc_state().map_err(|e| anyhow::anyhow!("ctrlc init failed: {e}"))?;
    let guard = InterruptGuard {
        previous: ctrlc.interrupted.load(Ordering::SeqCst),
    };
    // Set interrupted immediately before run_loop to minimize the window where
    // other tests might see the flag as true
    ctrlc.interrupted.store(true, Ordering::SeqCst);

    let result = crate::commands::run::run_loop(
        &resolved,
        crate::commands::run::RunLoopOptions {
            max_tasks: 0,
            agent_overrides: crate::commands::run::AgentOverrides::default(),
            force: false,
            auto_resume: true,
            starting_completed: 0,
            non_interactive: true,
            parallel_workers: None,
            wait_when_blocked: false,
            wait_poll_ms: 1000,
            wait_timeout_seconds: 0,
            notify_when_unblocked: false,
            wait_when_empty: false,
            empty_poll_ms: 30_000,
            run_event_handler: None,
        },
    );
    drop(guard);

    assert!(result.is_err(), "expected run_loop to abort early");
    assert!(
        !lock_dir.exists(),
        "expected stale queue lock to be cleared during resume"
    );

    Ok(())
}

#[test]
fn run_one_parallel_worker_acquires_queue_lock() -> anyhow::Result<()> {
    use crate::contracts::Task;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::thread;

    let temp = tempfile::TempDir::new()?;
    let repo_root = temp.path().to_path_buf();
    let ralph_dir = repo_root.join(".ralph");
    std::fs::create_dir_all(&ralph_dir)?;

    let queue_path = ralph_dir.join("queue.json");
    let mut queue_file = QueueFile {
        version: 1,
        tasks: vec![],
    };
    queue_file.tasks.push(Task {
        id: "RQ-0001".to_string(),
        title: "Test task".to_string(),
        description: None,
        status: TaskStatus::Todo,
        priority: crate::contracts::TaskPriority::Medium,
        tags: vec![],
        scope: vec![],
        evidence: vec![],
        plan: vec![],
        notes: vec![],
        request: None,
        agent: None,
        created_at: Some("2026-01-01T00:00:00Z".to_string()),
        updated_at: Some("2026-01-01T00:00:00Z".to_string()),
        completed_at: None,
        started_at: None,
        scheduled_start: None,
        depends_on: vec![],
        blocks: vec![],
        relates_to: vec![],
        duplicates: None,
        custom_fields: std::collections::HashMap::new(),
        estimated_minutes: None,
        actual_minutes: None,
        parent_id: None,
    });
    queue::save_queue(&queue_path, &queue_file)?;

    let _test_lock = queue::acquire_queue_lock(&repo_root, "test lock", false)?;

    let repo_root_clone = repo_root.clone();
    let lock_acquired = Arc::new(AtomicBool::new(false));
    let lock_acquired_clone = Arc::clone(&lock_acquired);

    let handle = thread::spawn(move || {
        let result = queue::acquire_queue_lock(&repo_root_clone, "parallel worker", false);

        if let Err(err) = result {
            let err_str = err.to_string();
            if err_str.contains("Queue lock already held") || err_str.contains("already held") {
                lock_acquired_clone.store(false, Ordering::SeqCst);
            }
        } else {
            lock_acquired_clone.store(true, Ordering::SeqCst);
            drop(result);
        }
    });

    handle.join().expect("thread panicked");

    assert!(
        !lock_acquired.load(Ordering::SeqCst),
        "Expected lock contention error when queue lock is already held"
    );

    Ok(())
}