ralph/commands/run/run_loop/
orchestration.rs1use anyhow::Result;
16
17use crate::config;
18use crate::contracts::TaskStatus;
19use crate::{queue, runutil};
20
21use super::lifecycle::LoopLifecycle;
22use super::session::resolve_resume_state;
23use super::types::RunLoopOptions;
24use super::wait::{WaitExit, WaitMode, wait_for_work};
25use crate::commands::run::queue_lock::{
26 clear_stale_queue_lock_for_resume, is_queue_lock_already_held_error, queue_lock_blocking_state,
27};
28use crate::commands::run::run_one::{RunOneResumeOptions, RunOutcome, run_one_with_handlers};
29use crate::commands::run::{emit_blocked_state_changed, emit_blocked_state_cleared};
30
31pub fn run_loop(resolved: &config::Resolved, opts: RunLoopOptions) -> Result<()> {
32 if let Some(result) = maybe_run_parallel(resolved, &opts)? {
33 return result;
34 }
35
36 let queue_file = queue::load_queue(&resolved.queue_path)?;
37 let include_draft = opts.agent_overrides.include_draft.unwrap_or(false);
38 let resume_state = resolve_resume_state(resolved, &opts)?;
39
40 if resume_state.resume_task_id.is_some()
41 && let Err(err) = clear_stale_queue_lock_for_resume(&resolved.repo_root)
42 {
43 log::warn!("Failed to clear stale queue lock for resume: {}", err);
44 }
45
46 let initial_todo_count = queue_file
47 .tasks
48 .iter()
49 .filter(|task| {
50 task.status == TaskStatus::Todo || (include_draft && task.status == TaskStatus::Draft)
51 })
52 .count() as u32;
53
54 if initial_todo_count == 0 && resume_state.resume_task_id.is_none() {
55 if include_draft {
56 log::info!("No todo or draft tasks found.");
57 } else {
58 log::info!("No todo tasks found.");
59 }
60 if !opts.wait_when_empty {
61 return Ok(());
62 }
63 }
64
65 let label = format!(
66 "RunLoop (todo={initial_todo_count}, max_tasks={})",
67 opts.max_tasks
68 );
69 let mut lifecycle =
70 LoopLifecycle::start(resolved, initial_todo_count, resume_state.completed_count);
71
72 let result = crate::commands::run::logging::with_scope(&label, || {
73 run_loop_state_machine(
74 resolved,
75 &opts,
76 include_draft,
77 resume_state.resume_task_id.as_deref(),
78 &mut lifecycle,
79 )
80 });
81
82 lifecycle.finish(resolved, &opts, &result);
83 result
84}
85
86fn maybe_run_parallel(
87 resolved: &config::Resolved,
88 opts: &RunLoopOptions,
89) -> Result<Option<Result<()>>> {
90 let parallel_workers = opts.parallel_workers.or(resolved.config.parallel.workers);
91 if let Some(workers) = parallel_workers
92 && workers >= 2
93 {
94 if opts.auto_resume {
95 log::warn!("Parallel run ignores --resume; starting a fresh parallel loop.");
96 }
97 if opts.starting_completed != 0 {
98 log::warn!("Parallel run ignores starting_completed; counters will start at zero.");
99 }
100 return Ok(Some(crate::commands::run::parallel::run_loop_parallel(
101 resolved,
102 crate::commands::run::parallel::ParallelRunOptions {
103 max_tasks: opts.max_tasks,
104 workers,
105 agent_overrides: opts.agent_overrides.clone(),
106 force: opts.force,
107 },
108 )));
109 }
110
111 Ok(None)
112}
113
114fn run_loop_state_machine(
115 resolved: &config::Resolved,
116 opts: &RunLoopOptions,
117 include_draft: bool,
118 resume_task_id: Option<&str>,
119 lifecycle: &mut LoopLifecycle,
120) -> Result<()> {
121 let mut pending_resume_task_id = resume_task_id.map(str::to_string);
122 let mut active_blocking = None;
123
124 loop {
125 if lifecycle.max_tasks_reached(opts) {
126 log::info!(
127 "RunLoop: end (reached max task limit: {})",
128 lifecycle.completed()
129 );
130 return Ok(());
131 }
132
133 if lifecycle.stop_requested() {
134 log::info!("Stop signal detected; no new tasks will be started.");
135 lifecycle.clear_stop_signal();
136 return Ok(());
137 }
138
139 match run_one_with_handlers(
140 resolved,
141 &opts.agent_overrides,
142 opts.force,
143 RunOneResumeOptions::resolved(pending_resume_task_id.take()),
144 None,
145 opts.run_event_handler.clone(),
146 ) {
147 Ok(RunOutcome::NoCandidates) => {
148 let idle_state = crate::contracts::BlockingState::idle(include_draft)
149 .with_observed_at(crate::timeutil::now_utc_rfc3339_or_fallback());
150 active_blocking = Some(idle_state.clone());
151 emit_blocked_state_changed(&idle_state, opts.run_event_handler.as_ref());
152
153 if !opts.wait_when_empty {
154 log::info!("{}", idle_state.message);
155 return Ok(());
156 }
157 match wait_for_work(
158 resolved,
159 include_draft,
160 WaitMode::EmptyAllowed,
161 opts.wait_poll_ms,
162 opts.empty_poll_ms,
163 0,
164 opts.notify_when_unblocked,
165 lifecycle.webhook_context(),
166 )? {
167 WaitExit::RunnableAvailable { .. } => {
168 log::info!("RunLoop: new runnable tasks detected; continuing");
169 if active_blocking.take().is_some() {
170 emit_blocked_state_cleared(opts.run_event_handler.as_ref());
171 }
172 }
173 WaitExit::QueueStillIdle { state } => {
174 log::info!("{}", state.message);
175 return Ok(());
176 }
177 WaitExit::TimedOut { state } => {
178 log::info!(
179 "RunLoop: end (wait timeout reached while {})",
180 state.message
181 );
182 return Ok(());
183 }
184 WaitExit::StopRequested { state } => {
185 if let Some(state) = state {
186 log::info!(
187 "RunLoop: end (stop signal received while {})",
188 state.message
189 );
190 } else {
191 log::info!("RunLoop: end (stop signal received)");
192 }
193 return Ok(());
194 }
195 }
196 }
197 Ok(RunOutcome::Blocked { summary, state }) => {
198 active_blocking = Some((*state).clone());
199
200 if !(opts.wait_when_blocked || opts.wait_when_empty) {
201 log::info!(
202 "{} (ready={} deps={} sched={})",
203 state.message,
204 summary.runnable_candidates,
205 summary.blocked_by_dependencies,
206 summary.blocked_by_schedule
207 );
208 return Ok(());
209 }
210
211 let mode = if opts.wait_when_empty {
212 WaitMode::EmptyAllowed
213 } else {
214 WaitMode::BlockedOnly
215 };
216
217 match wait_for_work(
218 resolved,
219 include_draft,
220 mode,
221 opts.wait_poll_ms,
222 opts.empty_poll_ms,
223 opts.wait_timeout_seconds,
224 opts.notify_when_unblocked,
225 lifecycle.webhook_context(),
226 )? {
227 WaitExit::RunnableAvailable {
228 summary: new_summary,
229 } => {
230 log::info!(
231 "RunLoop: unblocked (ready={}, deps={}, sched={}); continuing",
232 new_summary.runnable_candidates,
233 new_summary.blocked_by_dependencies,
234 new_summary.blocked_by_schedule
235 );
236 if active_blocking.take().is_some() {
237 emit_blocked_state_cleared(opts.run_event_handler.as_ref());
238 }
239 }
240 WaitExit::QueueStillIdle { state } => {
241 log::info!("{}", state.message);
242 return Ok(());
243 }
244 WaitExit::TimedOut { state } => {
245 log::info!(
246 "RunLoop: end (wait timeout reached while {})",
247 state.message
248 );
249 return Ok(());
250 }
251 WaitExit::StopRequested { state } => {
252 if let Some(state) = state {
253 log::info!(
254 "RunLoop: end (stop signal received while {})",
255 state.message
256 );
257 } else {
258 log::info!("RunLoop: end (stop signal received)");
259 }
260 return Ok(());
261 }
262 }
263 }
264 Ok(RunOutcome::Ran { .. }) => {
265 if active_blocking.take().is_some() {
266 emit_blocked_state_cleared(opts.run_event_handler.as_ref());
267 }
268 lifecycle.record_success()
269 }
270 Err(err) => {
271 if let Some(reason) = runutil::abort_reason(&err) {
272 match reason {
273 runutil::RunAbortReason::Interrupted => {
274 log::info!("RunLoop: aborting after interrupt");
275 }
276 runutil::RunAbortReason::UserRevert => {
277 log::info!("RunLoop: aborting after user-requested revert");
278 }
279 }
280 return Err(err);
281 }
282
283 if is_queue_lock_already_held_error(&err) {
284 if let Some(state) = queue_lock_blocking_state(&resolved.repo_root, &err) {
285 emit_blocked_state_changed(&state, opts.run_event_handler.as_ref());
286 }
287 log::error!("RunLoop: aborting due to queue lock contention");
288 return Err(err);
289 }
290 if runutil::is_dirty_repo_error(&err) {
291 log::error!("RunLoop: aborting due to dirty repository");
292 return Err(err);
293 }
294 if runutil::is_queue_validation_error(&err) {
295 log::error!("RunLoop: aborting due to queue validation error");
296 return Err(err);
297 }
298 if let Some(ci_failure) =
299 err.downcast_ref::<crate::commands::run::supervision::CiFailure>()
300 {
301 emit_blocked_state_changed(
302 &ci_failure.blocking_state(),
303 opts.run_event_handler.as_ref(),
304 );
305 }
306
307 lifecycle.record_failure(&err)?;
308 return Err(err);
309 }
310 }
311 }
312}