1use crate::agent::AgentOverrides;
18use crate::config;
19use crate::constants::limits::MAX_CONSECUTIVE_FAILURES;
20use crate::contracts::TaskStatus;
21use crate::session::{self, SessionValidationResult};
22use crate::signal;
23use crate::{queue, runutil, webhook};
24use anyhow::Result;
25
26use super::queue_lock::{clear_stale_queue_lock_for_resume, is_queue_lock_already_held_error};
27use super::run_one::{RunOutcome, run_one};
28
29pub struct RunLoopOptions {
30 pub max_tasks: u32,
32 pub agent_overrides: AgentOverrides,
33 pub force: bool,
34 pub auto_resume: bool,
36 pub starting_completed: u32,
38 pub non_interactive: bool,
40 pub parallel_workers: Option<u8>,
42 pub wait_when_blocked: bool,
44 pub wait_poll_ms: u64,
46 pub wait_timeout_seconds: u64,
48 pub notify_when_unblocked: bool,
50 pub wait_when_empty: bool,
52 pub empty_poll_ms: u64,
54}
55
56pub fn run_loop(resolved: &config::Resolved, opts: RunLoopOptions) -> Result<()> {
57 let parallel_workers = opts.parallel_workers.or(resolved.config.parallel.workers);
58 if let Some(workers) = parallel_workers
59 && workers >= 2
60 {
61 if opts.auto_resume {
62 log::warn!("Parallel run ignores --resume; starting a fresh parallel loop.");
63 }
64 if opts.starting_completed != 0 {
65 log::warn!("Parallel run ignores starting_completed; counters will start at zero.");
66 }
67 return super::parallel::run_loop_parallel(
68 resolved,
69 super::parallel::ParallelRunOptions {
70 max_tasks: opts.max_tasks,
71 workers,
72 agent_overrides: opts.agent_overrides,
73 force: opts.force,
74 },
75 );
76 }
77
78 let cache_dir = resolved.repo_root.join(".ralph/cache");
79 let queue_file = queue::load_queue(&resolved.queue_path)?;
80
81 let session_timeout_hours = resolved.config.agent.session_timeout_hours;
83 let (resume_task_id, completed_count) =
84 match session::check_session(&cache_dir, &queue_file, session_timeout_hours)? {
85 SessionValidationResult::NoSession => (None, opts.starting_completed),
86 SessionValidationResult::Valid(session) => {
87 if opts.auto_resume {
88 log::info!("Auto-resuming session for task {}", session.task_id);
89 (Some(session.task_id), session.tasks_completed_in_loop)
90 } else {
91 match session::prompt_session_recovery(&session, opts.non_interactive)? {
92 true => (Some(session.task_id), session.tasks_completed_in_loop),
93 false => {
94 session::clear_session(&cache_dir)?;
95 (None, opts.starting_completed)
96 }
97 }
98 }
99 }
100 SessionValidationResult::Stale { reason } => {
101 log::info!("Stale session cleared: {}", reason);
102 session::clear_session(&cache_dir)?;
103 (None, opts.starting_completed)
104 }
105 SessionValidationResult::Timeout { hours, session } => {
106 let threshold = session_timeout_hours
107 .unwrap_or(crate::constants::timeouts::DEFAULT_SESSION_TIMEOUT_HOURS);
108 match session::prompt_session_recovery_timeout(
109 &session,
110 hours,
111 threshold,
112 opts.non_interactive,
113 )? {
114 true => (Some(session.task_id), session.tasks_completed_in_loop),
115 false => {
116 session::clear_session(&cache_dir)?;
117 (None, opts.starting_completed)
118 }
119 }
120 }
121 };
122
123 if resume_task_id.is_some()
127 && let Err(err) = clear_stale_queue_lock_for_resume(&resolved.repo_root)
128 {
129 log::warn!("Failed to clear stale queue lock for resume: {}", err);
130 }
133
134 let include_draft = opts.agent_overrides.include_draft.unwrap_or(false);
135 let initial_todo_count = queue_file
136 .tasks
137 .iter()
138 .filter(|t| {
139 t.status == TaskStatus::Todo || (include_draft && t.status == TaskStatus::Draft)
140 })
141 .count() as u32;
142
143 if initial_todo_count == 0 && resume_task_id.is_none() {
144 if include_draft {
146 log::info!("No todo or draft tasks found.");
147 } else {
148 log::info!("No todo tasks found.");
149 }
150 if !opts.wait_when_empty {
151 return Ok(());
152 }
153 }
155
156 let label = format!(
157 "RunLoop (todo={initial_todo_count}, max_tasks={})",
158 opts.max_tasks
159 );
160
161 let mut tasks_attempted: usize = 0;
163 let mut tasks_succeeded: usize = 0;
164 let mut tasks_failed: usize = 0;
165
166 let mut consecutive_failures: u32 = 0;
168
169 let mut completed = completed_count;
171
172 signal::clear_stop_signal_at_loop_start(&cache_dir);
174
175 let loop_start_time = std::time::Instant::now();
177 let loop_started_at = crate::timeutil::now_utc_rfc3339_or_fallback();
178 let loop_webhook_ctx = crate::webhook::WebhookContext {
179 repo_root: Some(resolved.repo_root.display().to_string()),
180 branch: crate::git::current_branch(&resolved.repo_root).ok(),
181 commit: crate::session::get_git_head_commit(&resolved.repo_root),
182 ..Default::default()
183 };
184 webhook::notify_loop_started(
185 &resolved.config.agent.webhook,
186 &loop_started_at,
187 loop_webhook_ctx.clone(),
188 );
189
190 let result = super::logging::with_scope(&label, || {
191 loop {
192 if opts.max_tasks != 0 && completed >= opts.max_tasks {
193 log::info!("RunLoop: end (reached max task limit: {completed})");
194 return Ok(());
195 }
196
197 if signal::stop_signal_exists(&cache_dir) {
199 log::info!("Stop signal detected; no new tasks will be started.");
200 if let Err(e) = signal::clear_stop_signal(&cache_dir) {
201 log::warn!("Failed to clear stop signal: {}", e);
202 }
203 return Ok(());
204 }
205
206 match run_one(
207 resolved,
208 &opts.agent_overrides,
209 opts.force,
210 resume_task_id.as_deref(),
211 ) {
212 Ok(RunOutcome::NoCandidates) => {
213 if opts.wait_when_empty {
214 match wait_for_work(
216 resolved,
217 include_draft,
218 WaitMode::EmptyAllowed,
219 opts.wait_poll_ms,
220 opts.empty_poll_ms,
221 0, opts.notify_when_unblocked,
223 &loop_webhook_ctx,
224 )? {
225 WaitExit::RunnableAvailable { .. } => {
226 log::info!("RunLoop: new runnable tasks detected; continuing");
227 continue;
228 }
229 WaitExit::NoCandidates => {
230 continue;
232 }
233 WaitExit::TimedOut => {
234 log::info!("RunLoop: end (wait timeout reached)");
235 return Ok(());
236 }
237 WaitExit::StopRequested => {
238 log::info!("RunLoop: end (stop signal received)");
239 return Ok(());
240 }
241 }
242 } else {
243 log::info!("RunLoop: end (no more todo tasks remaining)");
244 return Ok(());
245 }
246 }
247 Ok(RunOutcome::Blocked { summary }) => {
248 if opts.wait_when_blocked || opts.wait_when_empty {
249 let mode = if opts.wait_when_empty {
251 WaitMode::EmptyAllowed
252 } else {
253 WaitMode::BlockedOnly
254 };
255 match wait_for_work(
257 resolved,
258 include_draft,
259 mode,
260 opts.wait_poll_ms,
261 opts.empty_poll_ms,
262 opts.wait_timeout_seconds,
263 opts.notify_when_unblocked,
264 &loop_webhook_ctx,
265 )? {
266 WaitExit::RunnableAvailable {
267 summary: new_summary,
268 } => {
269 log::info!(
270 "RunLoop: unblocked (ready={}, deps={}, sched={}); continuing",
271 new_summary.runnable_candidates,
272 new_summary.blocked_by_dependencies,
273 new_summary.blocked_by_schedule
274 );
275 continue;
276 }
277 WaitExit::NoCandidates => {
278 log::info!("RunLoop: end (queue became empty while waiting)");
279 return Ok(());
280 }
281 WaitExit::TimedOut => {
282 log::info!("RunLoop: end (wait timeout reached)");
283 return Ok(());
284 }
285 WaitExit::StopRequested => {
286 log::info!("RunLoop: end (stop signal received)");
287 return Ok(());
288 }
289 }
290 } else {
291 log::info!(
293 "RunLoop: end (blocked: ready={} deps={} sched={}). \
294 Use --wait-when-blocked to wait for dependencies/schedules.",
295 summary.runnable_candidates,
296 summary.blocked_by_dependencies,
297 summary.blocked_by_schedule
298 );
299 return Ok(());
300 }
301 }
302 Ok(RunOutcome::Ran { task_id: _ }) => {
303 completed += 1;
304 tasks_attempted += 1;
305 tasks_succeeded += 1;
306 consecutive_failures = 0; if let Err(e) = session::increment_session_progress(&cache_dir) {
310 log::warn!("Failed to persist session progress: {}", e);
311 }
312
313 if initial_todo_count == 0 {
314 log::info!("RunLoop: task-complete (completed={completed})");
315 } else {
316 log::info!("RunLoop: task-complete ({completed}/{initial_todo_count})");
317 }
318 }
319 Err(err) => {
320 if let Some(reason) = runutil::abort_reason(&err) {
321 match reason {
322 runutil::RunAbortReason::Interrupted => {
323 log::info!("RunLoop: aborting after interrupt");
324 }
325 runutil::RunAbortReason::UserRevert => {
326 log::info!("RunLoop: aborting after user-requested revert");
327 }
328 }
329 return Err(err);
330 }
331
332 if is_queue_lock_already_held_error(&err) {
335 log::error!("RunLoop: aborting due to queue lock contention");
336 return Err(err);
337 }
338
339 if runutil::is_dirty_repo_error(&err) {
343 log::error!("RunLoop: aborting due to dirty repository");
344 return Err(err);
345 }
346
347 if runutil::is_queue_validation_error(&err) {
352 log::error!("RunLoop: aborting due to queue validation error");
353 return Err(err);
354 }
355
356 completed += 1;
357 tasks_attempted += 1;
358 tasks_failed += 1;
359 consecutive_failures += 1;
360
361 if let Err(e) = session::increment_session_progress(&cache_dir) {
363 log::warn!("Failed to persist session progress: {}", e);
364 }
365
366 log::error!("RunLoop: task failed: {:#}", err);
367
368 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
370 log::error!(
371 "RunLoop: aborting after {MAX_CONSECUTIVE_FAILURES} consecutive failures"
372 );
373 return Err(anyhow::anyhow!(
374 "Run loop aborted after {} consecutive task failures. \
375 This usually indicates a systemic issue (e.g., repo dirty, \
376 runner misconfiguration, or interrupt flag stuck). \
377 Check logs above for root cause.",
378 MAX_CONSECUTIVE_FAILURES
379 ));
380 }
381 }
383 }
384 }
385 });
386
387 if tasks_attempted > 0 {
389 let notify_config = crate::notification::build_notification_config(
390 &resolved.config.agent.notification,
391 &crate::notification::NotificationOverrides {
392 notify_on_complete: opts.agent_overrides.notify_on_complete,
393 notify_on_fail: opts.agent_overrides.notify_on_fail,
394 notify_sound: opts.agent_overrides.notify_sound,
395 },
396 );
397 crate::notification::notify_loop_complete(
398 tasks_attempted,
399 tasks_succeeded,
400 tasks_failed,
401 ¬ify_config,
402 );
403 }
404
405 let loop_stopped_at = crate::timeutil::now_utc_rfc3339_or_fallback();
407 let loop_duration_ms = loop_start_time.elapsed().as_millis() as u64;
408 let loop_note = match &result {
409 Ok(()) => Some(format!(
410 "Completed: {}/{} succeeded",
411 tasks_succeeded, tasks_attempted
412 )),
413 Err(e) => Some(format!("Error: {}", e)),
414 };
415 webhook::notify_loop_stopped(
416 &resolved.config.agent.webhook,
417 &loop_stopped_at,
418 webhook::WebhookContext {
419 duration_ms: Some(loop_duration_ms),
420 ..loop_webhook_ctx
421 },
422 loop_note.as_deref(),
423 );
424
425 if result.is_ok()
427 && let Err(e) = session::clear_session(&cache_dir)
428 {
429 log::warn!("Failed to clear session on loop completion: {}", e);
430 }
431
432 result
433}
434
435#[derive(Debug)]
437enum WaitMode {
438 BlockedOnly,
440 EmptyAllowed,
442}
443
444enum WaitExit {
446 RunnableAvailable {
448 summary: crate::queue::operations::QueueRunnabilitySummary,
449 },
450 NoCandidates,
452 TimedOut,
454 StopRequested,
456}
457
458struct QueueFileWatcher {
460 _watcher: notify::RecommendedWatcher,
461 rx: std::sync::mpsc::Receiver<notify::Result<notify::Event>>,
462}
463
464impl QueueFileWatcher {
465 fn new(resolved: &config::Resolved) -> anyhow::Result<Self> {
466 use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
467 use std::sync::mpsc::channel;
468
469 let (tx, rx) = channel();
470 let mut watcher = RecommendedWatcher::new(
471 move |res| {
472 let _ = tx.send(res);
473 },
474 Config::default(),
475 )?;
476
477 let ralph_dir = resolved.repo_root.join(".ralph");
479 if ralph_dir.exists() {
480 watcher.watch(&ralph_dir, RecursiveMode::NonRecursive)?;
481 }
482
483 Ok(Self {
484 _watcher: watcher,
485 rx,
486 })
487 }
488
489 fn recv_timeout(&self, dur: std::time::Duration) -> Result<(), ()> {
490 match self.rx.recv_timeout(dur) {
491 Ok(_) => Ok(()),
492 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(()),
493 Err(_) => Err(()),
494 }
495 }
496}
497
498#[allow(clippy::too_many_arguments)]
502fn wait_for_work(
503 resolved: &config::Resolved,
504 include_draft: bool,
505 mode: WaitMode,
506 blocked_poll_ms: u64,
507 empty_poll_ms: u64,
508 timeout_seconds: u64,
509 notify_when_unblocked: bool,
510 loop_webhook_ctx: &crate::webhook::WebhookContext,
511) -> Result<WaitExit> {
512 use std::time::{Duration, Instant};
513
514 let cache_dir = resolved.repo_root.join(".ralph/cache");
515
516 let blocked_poll_ms = blocked_poll_ms.max(50);
518 let empty_poll_ms = empty_poll_ms.max(50);
519
520 let start = Instant::now();
521 let tick = Duration::from_millis(250);
522
523 let ctrlc = crate::runner::ctrlc_state().ok();
525
526 let watcher = QueueFileWatcher::new(resolved).ok();
528 if watcher.is_none() {
529 log::debug!("File watcher setup failed, using poll-only mode");
530 }
531
532 let poll_ms = match mode {
533 WaitMode::BlockedOnly => blocked_poll_ms,
534 WaitMode::EmptyAllowed => empty_poll_ms,
535 };
536
537 log::info!(
538 "Waiting for runnable tasks (mode={:?}, poll={}ms, timeout={}s)...",
539 mode,
540 poll_ms,
541 if timeout_seconds == 0 {
542 "none".to_string()
543 } else {
544 timeout_seconds.to_string()
545 }
546 );
547
548 let mut last_eval = Instant::now();
549 let mut pending_event = true; loop {
552 if timeout_seconds != 0 {
554 let elapsed = start.elapsed().as_secs();
555 if elapsed >= timeout_seconds {
556 return Ok(WaitExit::TimedOut);
557 }
558 }
559
560 if signal::stop_signal_exists(&cache_dir) {
562 if let Err(e) = signal::clear_stop_signal(&cache_dir) {
563 log::warn!("Failed to clear stop signal: {}", e);
564 }
565 return Ok(WaitExit::StopRequested);
566 }
567
568 if ctrlc
570 .as_ref()
571 .is_some_and(|c| c.interrupted.load(std::sync::atomic::Ordering::SeqCst))
572 {
573 return Err(runutil::RunAbort::new(
574 runutil::RunAbortReason::Interrupted,
575 "Ctrl+C pressed while waiting for runnable tasks",
576 )
577 .into());
578 }
579
580 if let Some(ref w) = watcher {
582 if w.recv_timeout(tick).is_ok() {
583 pending_event = true;
584 }
585 } else {
586 std::thread::sleep(tick);
587 }
588
589 let poll_dur = Duration::from_millis(poll_ms);
591 if pending_event || last_eval.elapsed() >= poll_dur {
592 pending_event = false;
593 last_eval = Instant::now();
594
595 let queue_file = match queue::load_queue(&resolved.queue_path) {
597 Ok(q) => q,
598 Err(e) => {
599 log::warn!("Failed to load queue while waiting: {}; will retry", e);
600 continue;
601 }
602 };
603
604 let done = queue::load_queue_or_default(&resolved.done_path)?;
605 let done_ref = if done.tasks.is_empty() && !resolved.done_path.exists() {
606 None
607 } else {
608 Some(&done)
609 };
610
611 let options = queue::RunnableSelectionOptions::new(include_draft, true);
613 let report = match crate::queue::operations::queue_runnability_report(
614 &queue_file,
615 done_ref,
616 options,
617 ) {
618 Ok(r) => r,
619 Err(e) => {
620 log::warn!(
621 "Failed to generate runnability report while waiting: {}; will retry",
622 e
623 );
624 continue;
625 }
626 };
627
628 if report.summary.candidates_total == 0 {
630 match mode {
631 WaitMode::BlockedOnly => {
632 return Ok(WaitExit::NoCandidates);
633 }
634 WaitMode::EmptyAllowed => {
635 continue;
637 }
638 }
639 }
640
641 if report.summary.runnable_candidates > 0 {
642 if notify_when_unblocked {
644 notify_queue_unblocked(&report.summary, resolved, loop_webhook_ctx);
645 }
646 return Ok(WaitExit::RunnableAvailable {
647 summary: report.summary,
648 });
649 }
650
651 }
653 }
654}
655
656fn notify_queue_unblocked(
658 summary: &crate::queue::operations::QueueRunnabilitySummary,
659 resolved: &config::Resolved,
660 loop_webhook_ctx: &crate::webhook::WebhookContext,
661) {
662 let note = format!(
664 "ready={} blocked_deps={} blocked_schedule={}",
665 summary.runnable_candidates, summary.blocked_by_dependencies, summary.blocked_by_schedule
666 );
667
668 let notify_config = crate::notification::NotificationConfig {
670 enabled: true,
671 notify_on_complete: false,
672 notify_on_fail: false,
673 notify_on_loop_complete: false,
674 suppress_when_active: resolved
675 .config
676 .agent
677 .notification
678 .suppress_when_active
679 .unwrap_or(true),
680 sound_enabled: resolved
681 .config
682 .agent
683 .notification
684 .sound_enabled
685 .unwrap_or(false),
686 sound_path: resolved.config.agent.notification.sound_path.clone(),
687 timeout_ms: resolved
688 .config
689 .agent
690 .notification
691 .timeout_ms
692 .unwrap_or(8000),
693 };
694
695 #[cfg(feature = "notifications")]
696 {
697 use notify_rust::{Notification, Timeout};
698 if let Err(e) = Notification::new()
699 .summary("Ralph: tasks runnable")
700 .body(¬e)
701 .timeout(Timeout::Milliseconds(notify_config.timeout_ms))
702 .show()
703 {
704 log::debug!("Failed to show unblocked notification: {}", e);
705 }
706
707 if notify_config.sound_enabled
708 && let Err(e) =
709 crate::notification::play_completion_sound(notify_config.sound_path.as_deref())
710 {
711 log::debug!("Failed to play unblocked sound: {}", e);
712 }
713 }
714
715 let timestamp = crate::timeutil::now_utc_rfc3339_or_fallback();
717 let payload = crate::webhook::WebhookPayload {
718 event: "queue_unblocked".to_string(),
719 timestamp,
720 task_id: None,
721 task_title: None,
722 previous_status: Some("blocked".to_string()),
723 current_status: Some("runnable".to_string()),
724 note: Some(note),
725 context: loop_webhook_ctx.clone(),
726 };
727 crate::webhook::send_webhook_payload(payload, &resolved.config.agent.webhook);
728}