ralph/commands/run/run_loop/
orchestration.rs1use anyhow::Result;
16
17use crate::config;
18use crate::contracts::TaskStatus;
19use crate::{queue, runutil};
20
21use super::lifecycle::LoopLifecycle;
22use super::session::resolve_resume_state;
23use super::types::RunLoopOptions;
24use super::wait::{WaitExit, WaitMode, wait_for_work};
25use crate::commands::run::queue_lock::{
26 clear_stale_queue_lock_for_resume, is_queue_lock_already_held_error, queue_lock_blocking_state,
27};
28use crate::commands::run::run_one::{RunOneResumeOptions, RunOutcome, run_one_with_handlers};
29use crate::commands::run::{emit_blocked_state_changed, emit_blocked_state_cleared};
30
31pub fn run_loop(resolved: &config::Resolved, opts: RunLoopOptions) -> Result<()> {
32 if let Some(result) = maybe_run_parallel(resolved, &opts)? {
33 return result;
34 }
35
36 let queue_file = queue::load_queue(&resolved.queue_path)?;
37 let include_draft = opts.agent_overrides.include_draft.unwrap_or(false);
38 let resume_state = resolve_resume_state(resolved, &opts)?;
39
40 if resume_state.resume_task_id.is_some()
41 && let Err(err) = clear_stale_queue_lock_for_resume(&resolved.repo_root)
42 {
43 log::warn!("Failed to clear stale queue lock for resume: {}", err);
44 }
45
46 let initial_todo_count = queue_file
47 .tasks
48 .iter()
49 .filter(|task| {
50 task.status == TaskStatus::Todo || (include_draft && task.status == TaskStatus::Draft)
51 })
52 .count() as u32;
53
54 if initial_todo_count == 0 && resume_state.resume_task_id.is_none() {
55 if include_draft {
56 log::info!("No todo or draft tasks found.");
57 } else {
58 log::info!("No todo tasks found.");
59 }
60 if !opts.wait_when_empty {
61 return Ok(());
62 }
63 }
64
65 let label = format!(
66 "RunLoop (todo={initial_todo_count}, max_tasks={})",
67 opts.max_tasks
68 );
69 let mut lifecycle =
70 LoopLifecycle::start(resolved, initial_todo_count, resume_state.completed_count);
71
72 let result = crate::commands::run::logging::with_scope(&label, || {
73 run_loop_state_machine(
74 resolved,
75 &opts,
76 include_draft,
77 resume_state.resume_task_id.as_deref(),
78 &mut lifecycle,
79 )
80 });
81
82 lifecycle.finish(resolved, &opts, &result);
83 result
84}
85
86fn maybe_run_parallel(
87 resolved: &config::Resolved,
88 opts: &RunLoopOptions,
89) -> Result<Option<Result<()>>> {
90 let parallel_workers = opts.parallel_workers.or(resolved.config.parallel.workers);
91 if let Some(workers) = parallel_workers
92 && workers >= 2
93 {
94 if opts.auto_resume {
95 log::warn!("Parallel run ignores --resume; starting a fresh parallel loop.");
96 }
97 if opts.starting_completed != 0 {
98 log::warn!("Parallel run ignores starting_completed; counters will start at zero.");
99 }
100 return Ok(Some(crate::commands::run::parallel::run_loop_parallel(
101 resolved,
102 crate::commands::run::parallel::ParallelRunOptions {
103 max_tasks: opts.max_tasks,
104 workers,
105 agent_overrides: opts.agent_overrides.clone(),
106 force: opts.force,
107 },
108 )));
109 }
110
111 Ok(None)
112}
113
114fn run_loop_state_machine(
115 resolved: &config::Resolved,
116 opts: &RunLoopOptions,
117 include_draft: bool,
118 resume_task_id: Option<&str>,
119 lifecycle: &mut LoopLifecycle,
120) -> Result<()> {
121 let mut pending_resume_task_id = resume_task_id.map(str::to_string);
122 let mut active_blocking = None;
123
124 loop {
125 if lifecycle.max_tasks_reached(opts) {
126 log::info!(
127 "RunLoop: end (reached max task limit: {})",
128 lifecycle.completed()
129 );
130 return Ok(());
131 }
132
133 if lifecycle.stop_requested() {
134 log::info!("Stop signal detected; no new tasks will be started.");
135 lifecycle.clear_stop_signal();
136 return Ok(());
137 }
138
139 match run_one_with_handlers(
140 resolved,
141 &opts.agent_overrides,
142 opts.force,
143 RunOneResumeOptions::resolved(pending_resume_task_id.take()),
144 None,
145 opts.run_event_handler.clone(),
146 ) {
147 Ok(RunOutcome::NoCandidates) => {
148 let idle_state = crate::contracts::BlockingState::idle(include_draft);
149 active_blocking = Some(idle_state.clone());
150 emit_blocked_state_changed(&idle_state, opts.run_event_handler.as_ref());
151
152 if !opts.wait_when_empty {
153 log::info!("{}", idle_state.message);
154 return Ok(());
155 }
156 match wait_for_work(
157 resolved,
158 include_draft,
159 WaitMode::EmptyAllowed,
160 opts.wait_poll_ms,
161 opts.empty_poll_ms,
162 0,
163 opts.notify_when_unblocked,
164 lifecycle.webhook_context(),
165 )? {
166 WaitExit::RunnableAvailable { .. } => {
167 log::info!("RunLoop: new runnable tasks detected; continuing");
168 if active_blocking.take().is_some() {
169 emit_blocked_state_cleared(opts.run_event_handler.as_ref());
170 }
171 }
172 WaitExit::QueueStillIdle { state } => {
173 log::info!("{}", state.message);
174 return Ok(());
175 }
176 WaitExit::TimedOut { state } => {
177 log::info!(
178 "RunLoop: end (wait timeout reached while {})",
179 state.message
180 );
181 return Ok(());
182 }
183 WaitExit::StopRequested { state } => {
184 if let Some(state) = state {
185 log::info!(
186 "RunLoop: end (stop signal received while {})",
187 state.message
188 );
189 } else {
190 log::info!("RunLoop: end (stop signal received)");
191 }
192 return Ok(());
193 }
194 }
195 }
196 Ok(RunOutcome::Blocked { summary, state }) => {
197 active_blocking = Some((*state).clone());
198
199 if !(opts.wait_when_blocked || opts.wait_when_empty) {
200 log::info!(
201 "{} (ready={} deps={} sched={})",
202 state.message,
203 summary.runnable_candidates,
204 summary.blocked_by_dependencies,
205 summary.blocked_by_schedule
206 );
207 return Ok(());
208 }
209
210 let mode = if opts.wait_when_empty {
211 WaitMode::EmptyAllowed
212 } else {
213 WaitMode::BlockedOnly
214 };
215
216 match wait_for_work(
217 resolved,
218 include_draft,
219 mode,
220 opts.wait_poll_ms,
221 opts.empty_poll_ms,
222 opts.wait_timeout_seconds,
223 opts.notify_when_unblocked,
224 lifecycle.webhook_context(),
225 )? {
226 WaitExit::RunnableAvailable {
227 summary: new_summary,
228 } => {
229 log::info!(
230 "RunLoop: unblocked (ready={}, deps={}, sched={}); continuing",
231 new_summary.runnable_candidates,
232 new_summary.blocked_by_dependencies,
233 new_summary.blocked_by_schedule
234 );
235 if active_blocking.take().is_some() {
236 emit_blocked_state_cleared(opts.run_event_handler.as_ref());
237 }
238 }
239 WaitExit::QueueStillIdle { state } => {
240 log::info!("{}", state.message);
241 return Ok(());
242 }
243 WaitExit::TimedOut { state } => {
244 log::info!(
245 "RunLoop: end (wait timeout reached while {})",
246 state.message
247 );
248 return Ok(());
249 }
250 WaitExit::StopRequested { state } => {
251 if let Some(state) = state {
252 log::info!(
253 "RunLoop: end (stop signal received while {})",
254 state.message
255 );
256 } else {
257 log::info!("RunLoop: end (stop signal received)");
258 }
259 return Ok(());
260 }
261 }
262 }
263 Ok(RunOutcome::Ran { .. }) => {
264 if active_blocking.take().is_some() {
265 emit_blocked_state_cleared(opts.run_event_handler.as_ref());
266 }
267 lifecycle.record_success()
268 }
269 Err(err) => {
270 if let Some(reason) = runutil::abort_reason(&err) {
271 match reason {
272 runutil::RunAbortReason::Interrupted => {
273 log::info!("RunLoop: aborting after interrupt");
274 }
275 runutil::RunAbortReason::UserRevert => {
276 log::info!("RunLoop: aborting after user-requested revert");
277 }
278 }
279 return Err(err);
280 }
281
282 if is_queue_lock_already_held_error(&err) {
283 if let Some(state) = queue_lock_blocking_state(&resolved.repo_root, &err) {
284 emit_blocked_state_changed(&state, opts.run_event_handler.as_ref());
285 }
286 log::error!("RunLoop: aborting due to queue lock contention");
287 return Err(err);
288 }
289 if runutil::is_dirty_repo_error(&err) {
290 log::error!("RunLoop: aborting due to dirty repository");
291 return Err(err);
292 }
293 if runutil::is_queue_validation_error(&err) {
294 log::error!("RunLoop: aborting due to queue validation error");
295 return Err(err);
296 }
297 if let Some(ci_failure) =
298 err.downcast_ref::<crate::commands::run::supervision::CiFailure>()
299 {
300 emit_blocked_state_changed(
301 &ci_failure.blocking_state(),
302 opts.run_event_handler.as_ref(),
303 );
304 }
305
306 lifecycle.record_failure(&err)?;
307 }
308 }
309 }
310}