Skip to main content

ralph/commands/run/run_loop/
orchestration.rs

1//! Sequential run-loop state machine.
2//!
3//! Responsibilities:
4//! - Route between sequential and parallel execution.
5//! - Drive per-iteration task execution, wait transitions, and abort handling.
6//!
7//! Not handled here:
8//! - Session recovery policy details.
9//! - Wait-loop file watching internals.
10//!
11//! Invariants/assumptions:
12//! - Queue lock contention, dirty repos, and queue validation failures are terminal.
13//! - Parallel execution handoff happens before sequential state is initialized.
14
15use anyhow::Result;
16
17use crate::config;
18use crate::contracts::TaskStatus;
19use crate::{queue, runutil};
20
21use super::lifecycle::LoopLifecycle;
22use super::session::resolve_resume_state;
23use super::types::RunLoopOptions;
24use super::wait::{WaitExit, WaitMode, wait_for_work};
25use crate::commands::run::queue_lock::{
26    clear_stale_queue_lock_for_resume, is_queue_lock_already_held_error, queue_lock_blocking_state,
27};
28use crate::commands::run::run_one::{RunOneResumeOptions, RunOutcome, run_one_with_handlers};
29use crate::commands::run::{emit_blocked_state_changed, emit_blocked_state_cleared};
30
31pub fn run_loop(resolved: &config::Resolved, opts: RunLoopOptions) -> Result<()> {
32    if let Some(result) = maybe_run_parallel(resolved, &opts)? {
33        return result;
34    }
35
36    let queue_file = queue::load_queue(&resolved.queue_path)?;
37    let include_draft = opts.agent_overrides.include_draft.unwrap_or(false);
38    let resume_state = resolve_resume_state(resolved, &opts)?;
39
40    if resume_state.resume_task_id.is_some()
41        && let Err(err) = clear_stale_queue_lock_for_resume(&resolved.repo_root)
42    {
43        log::warn!("Failed to clear stale queue lock for resume: {}", err);
44    }
45
46    let initial_todo_count = queue_file
47        .tasks
48        .iter()
49        .filter(|task| {
50            task.status == TaskStatus::Todo || (include_draft && task.status == TaskStatus::Draft)
51        })
52        .count() as u32;
53
54    if initial_todo_count == 0 && resume_state.resume_task_id.is_none() {
55        if include_draft {
56            log::info!("No todo or draft tasks found.");
57        } else {
58            log::info!("No todo tasks found.");
59        }
60        if !opts.wait_when_empty {
61            return Ok(());
62        }
63    }
64
65    let label = format!(
66        "RunLoop (todo={initial_todo_count}, max_tasks={})",
67        opts.max_tasks
68    );
69    let mut lifecycle =
70        LoopLifecycle::start(resolved, initial_todo_count, resume_state.completed_count);
71
72    let result = crate::commands::run::logging::with_scope(&label, || {
73        run_loop_state_machine(
74            resolved,
75            &opts,
76            include_draft,
77            resume_state.resume_task_id.as_deref(),
78            &mut lifecycle,
79        )
80    });
81
82    lifecycle.finish(resolved, &opts, &result);
83    result
84}
85
86fn maybe_run_parallel(
87    resolved: &config::Resolved,
88    opts: &RunLoopOptions,
89) -> Result<Option<Result<()>>> {
90    let parallel_workers = opts.parallel_workers.or(resolved.config.parallel.workers);
91    if let Some(workers) = parallel_workers
92        && workers >= 2
93    {
94        if opts.auto_resume {
95            log::warn!("Parallel run ignores --resume; starting a fresh parallel loop.");
96        }
97        if opts.starting_completed != 0 {
98            log::warn!("Parallel run ignores starting_completed; counters will start at zero.");
99        }
100        return Ok(Some(crate::commands::run::parallel::run_loop_parallel(
101            resolved,
102            crate::commands::run::parallel::ParallelRunOptions {
103                max_tasks: opts.max_tasks,
104                workers,
105                agent_overrides: opts.agent_overrides.clone(),
106                force: opts.force,
107            },
108        )));
109    }
110
111    Ok(None)
112}
113
114fn run_loop_state_machine(
115    resolved: &config::Resolved,
116    opts: &RunLoopOptions,
117    include_draft: bool,
118    resume_task_id: Option<&str>,
119    lifecycle: &mut LoopLifecycle,
120) -> Result<()> {
121    let mut pending_resume_task_id = resume_task_id.map(str::to_string);
122    let mut active_blocking = None;
123
124    loop {
125        if lifecycle.max_tasks_reached(opts) {
126            log::info!(
127                "RunLoop: end (reached max task limit: {})",
128                lifecycle.completed()
129            );
130            return Ok(());
131        }
132
133        if lifecycle.stop_requested() {
134            log::info!("Stop signal detected; no new tasks will be started.");
135            lifecycle.clear_stop_signal();
136            return Ok(());
137        }
138
139        match run_one_with_handlers(
140            resolved,
141            &opts.agent_overrides,
142            opts.force,
143            RunOneResumeOptions::resolved(pending_resume_task_id.take()),
144            None,
145            opts.run_event_handler.clone(),
146        ) {
147            Ok(RunOutcome::NoCandidates) => {
148                let idle_state = crate::contracts::BlockingState::idle(include_draft)
149                    .with_observed_at(crate::timeutil::now_utc_rfc3339_or_fallback());
150                active_blocking = Some(idle_state.clone());
151                emit_blocked_state_changed(&idle_state, opts.run_event_handler.as_ref());
152
153                if !opts.wait_when_empty {
154                    log::info!("{}", idle_state.message);
155                    return Ok(());
156                }
157                match wait_for_work(
158                    resolved,
159                    include_draft,
160                    WaitMode::EmptyAllowed,
161                    opts.wait_poll_ms,
162                    opts.empty_poll_ms,
163                    0,
164                    opts.notify_when_unblocked,
165                    lifecycle.webhook_context(),
166                )? {
167                    WaitExit::RunnableAvailable { .. } => {
168                        log::info!("RunLoop: new runnable tasks detected; continuing");
169                        if active_blocking.take().is_some() {
170                            emit_blocked_state_cleared(opts.run_event_handler.as_ref());
171                        }
172                    }
173                    WaitExit::QueueStillIdle { state } => {
174                        log::info!("{}", state.message);
175                        return Ok(());
176                    }
177                    WaitExit::TimedOut { state } => {
178                        log::info!(
179                            "RunLoop: end (wait timeout reached while {})",
180                            state.message
181                        );
182                        return Ok(());
183                    }
184                    WaitExit::StopRequested { state } => {
185                        if let Some(state) = state {
186                            log::info!(
187                                "RunLoop: end (stop signal received while {})",
188                                state.message
189                            );
190                        } else {
191                            log::info!("RunLoop: end (stop signal received)");
192                        }
193                        return Ok(());
194                    }
195                }
196            }
197            Ok(RunOutcome::Blocked { summary, state }) => {
198                active_blocking = Some((*state).clone());
199
200                if !(opts.wait_when_blocked || opts.wait_when_empty) {
201                    log::info!(
202                        "{} (ready={} deps={} sched={})",
203                        state.message,
204                        summary.runnable_candidates,
205                        summary.blocked_by_dependencies,
206                        summary.blocked_by_schedule
207                    );
208                    return Ok(());
209                }
210
211                let mode = if opts.wait_when_empty {
212                    WaitMode::EmptyAllowed
213                } else {
214                    WaitMode::BlockedOnly
215                };
216
217                match wait_for_work(
218                    resolved,
219                    include_draft,
220                    mode,
221                    opts.wait_poll_ms,
222                    opts.empty_poll_ms,
223                    opts.wait_timeout_seconds,
224                    opts.notify_when_unblocked,
225                    lifecycle.webhook_context(),
226                )? {
227                    WaitExit::RunnableAvailable {
228                        summary: new_summary,
229                    } => {
230                        log::info!(
231                            "RunLoop: unblocked (ready={}, deps={}, sched={}); continuing",
232                            new_summary.runnable_candidates,
233                            new_summary.blocked_by_dependencies,
234                            new_summary.blocked_by_schedule
235                        );
236                        if active_blocking.take().is_some() {
237                            emit_blocked_state_cleared(opts.run_event_handler.as_ref());
238                        }
239                    }
240                    WaitExit::QueueStillIdle { state } => {
241                        log::info!("{}", state.message);
242                        return Ok(());
243                    }
244                    WaitExit::TimedOut { state } => {
245                        log::info!(
246                            "RunLoop: end (wait timeout reached while {})",
247                            state.message
248                        );
249                        return Ok(());
250                    }
251                    WaitExit::StopRequested { state } => {
252                        if let Some(state) = state {
253                            log::info!(
254                                "RunLoop: end (stop signal received while {})",
255                                state.message
256                            );
257                        } else {
258                            log::info!("RunLoop: end (stop signal received)");
259                        }
260                        return Ok(());
261                    }
262                }
263            }
264            Ok(RunOutcome::Ran { .. }) => {
265                if active_blocking.take().is_some() {
266                    emit_blocked_state_cleared(opts.run_event_handler.as_ref());
267                }
268                lifecycle.record_success()
269            }
270            Err(err) => {
271                if let Some(reason) = runutil::abort_reason(&err) {
272                    match reason {
273                        runutil::RunAbortReason::Interrupted => {
274                            log::info!("RunLoop: aborting after interrupt");
275                        }
276                        runutil::RunAbortReason::UserRevert => {
277                            log::info!("RunLoop: aborting after user-requested revert");
278                        }
279                    }
280                    return Err(err);
281                }
282
283                if is_queue_lock_already_held_error(&err) {
284                    if let Some(state) = queue_lock_blocking_state(&resolved.repo_root, &err) {
285                        emit_blocked_state_changed(&state, opts.run_event_handler.as_ref());
286                    }
287                    log::error!("RunLoop: aborting due to queue lock contention");
288                    return Err(err);
289                }
290                if runutil::is_dirty_repo_error(&err) {
291                    log::error!("RunLoop: aborting due to dirty repository");
292                    return Err(err);
293                }
294                if runutil::is_queue_validation_error(&err) {
295                    log::error!("RunLoop: aborting due to queue validation error");
296                    return Err(err);
297                }
298                if let Some(ci_failure) =
299                    err.downcast_ref::<crate::commands::run::supervision::CiFailure>()
300                {
301                    emit_blocked_state_changed(
302                        &ci_failure.blocking_state(),
303                        opts.run_event_handler.as_ref(),
304                    );
305                }
306
307                lifecycle.record_failure(&err)?;
308                return Err(err);
309            }
310        }
311    }
312}