Skip to main content

ralph/commands/run/run_loop/
orchestration.rs

1//! Sequential run-loop state machine.
2//!
3//! Responsibilities:
4//! - Route between sequential and parallel execution.
5//! - Drive per-iteration task execution, wait transitions, and abort handling.
6//!
7//! Not handled here:
8//! - Session recovery policy details.
9//! - Wait-loop file watching internals.
10//!
11//! Invariants/assumptions:
12//! - Queue lock contention, dirty repos, and queue validation failures are terminal.
13//! - Parallel execution handoff happens before sequential state is initialized.
14
15use anyhow::Result;
16
17use crate::config;
18use crate::contracts::TaskStatus;
19use crate::{queue, runutil};
20
21use super::lifecycle::LoopLifecycle;
22use super::session::resolve_resume_state;
23use super::types::RunLoopOptions;
24use super::wait::{WaitExit, WaitMode, wait_for_work};
25use crate::commands::run::queue_lock::{
26    clear_stale_queue_lock_for_resume, is_queue_lock_already_held_error, queue_lock_blocking_state,
27};
28use crate::commands::run::run_one::{RunOneResumeOptions, RunOutcome, run_one_with_handlers};
29use crate::commands::run::{emit_blocked_state_changed, emit_blocked_state_cleared};
30
31pub fn run_loop(resolved: &config::Resolved, opts: RunLoopOptions) -> Result<()> {
32    if let Some(result) = maybe_run_parallel(resolved, &opts)? {
33        return result;
34    }
35
36    let queue_file = queue::load_queue(&resolved.queue_path)?;
37    let include_draft = opts.agent_overrides.include_draft.unwrap_or(false);
38    let resume_state = resolve_resume_state(resolved, &opts)?;
39
40    if resume_state.resume_task_id.is_some()
41        && let Err(err) = clear_stale_queue_lock_for_resume(&resolved.repo_root)
42    {
43        log::warn!("Failed to clear stale queue lock for resume: {}", err);
44    }
45
46    let initial_todo_count = queue_file
47        .tasks
48        .iter()
49        .filter(|task| {
50            task.status == TaskStatus::Todo || (include_draft && task.status == TaskStatus::Draft)
51        })
52        .count() as u32;
53
54    if initial_todo_count == 0 && resume_state.resume_task_id.is_none() {
55        if include_draft {
56            log::info!("No todo or draft tasks found.");
57        } else {
58            log::info!("No todo tasks found.");
59        }
60        if !opts.wait_when_empty {
61            return Ok(());
62        }
63    }
64
65    let label = format!(
66        "RunLoop (todo={initial_todo_count}, max_tasks={})",
67        opts.max_tasks
68    );
69    let mut lifecycle =
70        LoopLifecycle::start(resolved, initial_todo_count, resume_state.completed_count);
71
72    let result = crate::commands::run::logging::with_scope(&label, || {
73        run_loop_state_machine(
74            resolved,
75            &opts,
76            include_draft,
77            resume_state.resume_task_id.as_deref(),
78            &mut lifecycle,
79        )
80    });
81
82    lifecycle.finish(resolved, &opts, &result);
83    result
84}
85
86fn maybe_run_parallel(
87    resolved: &config::Resolved,
88    opts: &RunLoopOptions,
89) -> Result<Option<Result<()>>> {
90    let parallel_workers = opts.parallel_workers.or(resolved.config.parallel.workers);
91    if let Some(workers) = parallel_workers
92        && workers >= 2
93    {
94        if opts.auto_resume {
95            log::warn!("Parallel run ignores --resume; starting a fresh parallel loop.");
96        }
97        if opts.starting_completed != 0 {
98            log::warn!("Parallel run ignores starting_completed; counters will start at zero.");
99        }
100        return Ok(Some(crate::commands::run::parallel::run_loop_parallel(
101            resolved,
102            crate::commands::run::parallel::ParallelRunOptions {
103                max_tasks: opts.max_tasks,
104                workers,
105                agent_overrides: opts.agent_overrides.clone(),
106                force: opts.force,
107            },
108        )));
109    }
110
111    Ok(None)
112}
113
114fn run_loop_state_machine(
115    resolved: &config::Resolved,
116    opts: &RunLoopOptions,
117    include_draft: bool,
118    resume_task_id: Option<&str>,
119    lifecycle: &mut LoopLifecycle,
120) -> Result<()> {
121    let mut pending_resume_task_id = resume_task_id.map(str::to_string);
122    let mut active_blocking = None;
123
124    loop {
125        if lifecycle.max_tasks_reached(opts) {
126            log::info!(
127                "RunLoop: end (reached max task limit: {})",
128                lifecycle.completed()
129            );
130            return Ok(());
131        }
132
133        if lifecycle.stop_requested() {
134            log::info!("Stop signal detected; no new tasks will be started.");
135            lifecycle.clear_stop_signal();
136            return Ok(());
137        }
138
139        match run_one_with_handlers(
140            resolved,
141            &opts.agent_overrides,
142            opts.force,
143            RunOneResumeOptions::resolved(pending_resume_task_id.take()),
144            None,
145            opts.run_event_handler.clone(),
146        ) {
147            Ok(RunOutcome::NoCandidates) => {
148                let idle_state = crate::contracts::BlockingState::idle(include_draft);
149                active_blocking = Some(idle_state.clone());
150                emit_blocked_state_changed(&idle_state, opts.run_event_handler.as_ref());
151
152                if !opts.wait_when_empty {
153                    log::info!("{}", idle_state.message);
154                    return Ok(());
155                }
156                match wait_for_work(
157                    resolved,
158                    include_draft,
159                    WaitMode::EmptyAllowed,
160                    opts.wait_poll_ms,
161                    opts.empty_poll_ms,
162                    0,
163                    opts.notify_when_unblocked,
164                    lifecycle.webhook_context(),
165                )? {
166                    WaitExit::RunnableAvailable { .. } => {
167                        log::info!("RunLoop: new runnable tasks detected; continuing");
168                        if active_blocking.take().is_some() {
169                            emit_blocked_state_cleared(opts.run_event_handler.as_ref());
170                        }
171                    }
172                    WaitExit::QueueStillIdle { state } => {
173                        log::info!("{}", state.message);
174                        return Ok(());
175                    }
176                    WaitExit::TimedOut { state } => {
177                        log::info!(
178                            "RunLoop: end (wait timeout reached while {})",
179                            state.message
180                        );
181                        return Ok(());
182                    }
183                    WaitExit::StopRequested { state } => {
184                        if let Some(state) = state {
185                            log::info!(
186                                "RunLoop: end (stop signal received while {})",
187                                state.message
188                            );
189                        } else {
190                            log::info!("RunLoop: end (stop signal received)");
191                        }
192                        return Ok(());
193                    }
194                }
195            }
196            Ok(RunOutcome::Blocked { summary, state }) => {
197                active_blocking = Some((*state).clone());
198
199                if !(opts.wait_when_blocked || opts.wait_when_empty) {
200                    log::info!(
201                        "{} (ready={} deps={} sched={})",
202                        state.message,
203                        summary.runnable_candidates,
204                        summary.blocked_by_dependencies,
205                        summary.blocked_by_schedule
206                    );
207                    return Ok(());
208                }
209
210                let mode = if opts.wait_when_empty {
211                    WaitMode::EmptyAllowed
212                } else {
213                    WaitMode::BlockedOnly
214                };
215
216                match wait_for_work(
217                    resolved,
218                    include_draft,
219                    mode,
220                    opts.wait_poll_ms,
221                    opts.empty_poll_ms,
222                    opts.wait_timeout_seconds,
223                    opts.notify_when_unblocked,
224                    lifecycle.webhook_context(),
225                )? {
226                    WaitExit::RunnableAvailable {
227                        summary: new_summary,
228                    } => {
229                        log::info!(
230                            "RunLoop: unblocked (ready={}, deps={}, sched={}); continuing",
231                            new_summary.runnable_candidates,
232                            new_summary.blocked_by_dependencies,
233                            new_summary.blocked_by_schedule
234                        );
235                        if active_blocking.take().is_some() {
236                            emit_blocked_state_cleared(opts.run_event_handler.as_ref());
237                        }
238                    }
239                    WaitExit::QueueStillIdle { state } => {
240                        log::info!("{}", state.message);
241                        return Ok(());
242                    }
243                    WaitExit::TimedOut { state } => {
244                        log::info!(
245                            "RunLoop: end (wait timeout reached while {})",
246                            state.message
247                        );
248                        return Ok(());
249                    }
250                    WaitExit::StopRequested { state } => {
251                        if let Some(state) = state {
252                            log::info!(
253                                "RunLoop: end (stop signal received while {})",
254                                state.message
255                            );
256                        } else {
257                            log::info!("RunLoop: end (stop signal received)");
258                        }
259                        return Ok(());
260                    }
261                }
262            }
263            Ok(RunOutcome::Ran { .. }) => {
264                if active_blocking.take().is_some() {
265                    emit_blocked_state_cleared(opts.run_event_handler.as_ref());
266                }
267                lifecycle.record_success()
268            }
269            Err(err) => {
270                if let Some(reason) = runutil::abort_reason(&err) {
271                    match reason {
272                        runutil::RunAbortReason::Interrupted => {
273                            log::info!("RunLoop: aborting after interrupt");
274                        }
275                        runutil::RunAbortReason::UserRevert => {
276                            log::info!("RunLoop: aborting after user-requested revert");
277                        }
278                    }
279                    return Err(err);
280                }
281
282                if is_queue_lock_already_held_error(&err) {
283                    if let Some(state) = queue_lock_blocking_state(&resolved.repo_root, &err) {
284                        emit_blocked_state_changed(&state, opts.run_event_handler.as_ref());
285                    }
286                    log::error!("RunLoop: aborting due to queue lock contention");
287                    return Err(err);
288                }
289                if runutil::is_dirty_repo_error(&err) {
290                    log::error!("RunLoop: aborting due to dirty repository");
291                    return Err(err);
292                }
293                if runutil::is_queue_validation_error(&err) {
294                    log::error!("RunLoop: aborting due to queue validation error");
295                    return Err(err);
296                }
297                if let Some(ci_failure) =
298                    err.downcast_ref::<crate::commands::run::supervision::CiFailure>()
299                {
300                    emit_blocked_state_changed(
301                        &ci_failure.blocking_state(),
302                        opts.run_event_handler.as_ref(),
303                    );
304                }
305
306                lifecycle.record_failure(&err)?;
307            }
308        }
309    }
310}