1use std::collections::HashSet;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::Command;
5
6use chrono::Utc;
7use ralph_core::{
8 LoopLock, LoopRegistry, MergeButtonState, MergeQueue, MergeState, RegistryError,
9 merge_button_state, remove_worktree,
10};
11use serde::{Deserialize, Serialize};
12
13use crate::errors::ApiError;
14use crate::loop_side_effects::{resolve_discard_target, resolve_loop_root, spawn_retry_merge_flow};
15use crate::loop_support::{
16 current_commit, is_pid_alive, loop_not_found_error, map_merge_error, map_worktree_error, now_ts,
17};
18use crate::task_domain::{TaskCreateParams, TaskDomain};
19
20#[derive(Debug, Clone, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct LoopListParams {
23 pub include_terminal: Option<bool>,
24}
25#[derive(Debug, Clone, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct LoopRetryParams {
28 pub id: String,
29 pub steering_input: Option<String>,
30}
31#[derive(Debug, Clone, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct LoopStopMergeParams {
34 pub id: String,
35 pub force: Option<bool>,
36}
37#[derive(Debug, Clone, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct LoopTriggerMergeTaskParams {
40 pub loop_id: String,
41}
42#[derive(Debug, Clone, Serialize)]
43#[serde(rename_all = "camelCase")]
44pub struct LoopRecord {
45 pub id: String,
46 pub status: String,
47 pub location: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub prompt: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub merge_commit: Option<String>,
52}
53#[derive(Debug, Clone, Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct LoopStatusResult {
56 pub running: bool,
57 pub interval_ms: u64,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub last_processed_at: Option<String>,
60}
61#[derive(Debug, Clone, Serialize)]
62#[serde(rename_all = "camelCase")]
63pub struct MergeButtonStateResult {
64 pub enabled: bool,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub reason: Option<String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub action: Option<String>,
69}
70#[derive(Debug, Clone, Serialize)]
71#[serde(rename_all = "camelCase")]
72pub struct TriggerMergeTaskResult {
73 pub success: bool,
74 pub task_id: String,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub queued_task_id: Option<String>,
77}
78
79pub struct LoopDomain {
80 workspace_root: PathBuf,
81 process_interval_ms: u64,
82 ralph_command: String,
83 last_processed_at: Option<String>,
84}
85
86impl LoopDomain {
87 pub fn new(
88 workspace_root: impl AsRef<Path>,
89 process_interval_ms: u64,
90 ralph_command: impl Into<String>,
91 ) -> Self {
92 Self {
93 workspace_root: workspace_root.as_ref().to_path_buf(),
94 process_interval_ms,
95 ralph_command: ralph_command.into(),
96 last_processed_at: None,
97 }
98 }
99 pub fn list(&self, params: LoopListParams) -> Result<Vec<LoopRecord>, ApiError> {
100 let include_terminal = params.include_terminal.unwrap_or(false);
101 let registry = LoopRegistry::new(&self.workspace_root);
102 let merge_queue = MergeQueue::new(&self.workspace_root);
103
104 let mut loops = Vec::new();
105 let mut listed_ids = HashSet::new();
106
107 if let Ok(Some(metadata)) = LoopLock::read_existing(&self.workspace_root)
108 && is_pid_alive(metadata.pid)
109 {
110 loops.push(LoopRecord {
111 id: "(primary)".to_string(),
112 status: "running".to_string(),
113 location: "(in-place)".to_string(),
114 prompt: Some(metadata.prompt),
115 merge_commit: None,
116 });
117 listed_ids.insert("(primary)".to_string());
118 }
119
120 let registry_entries = registry
121 .list()
122 .map_err(|error| ApiError::internal(format!("failed listing loops: {error}")))?;
123
124 for entry in registry_entries {
125 let status = if entry.is_alive() {
126 "running"
127 } else if entry.is_pid_alive() {
128 "orphan"
129 } else {
130 "crashed"
131 };
132
133 let location = entry
134 .worktree_path
135 .clone()
136 .unwrap_or_else(|| "(in-place)".to_string());
137
138 listed_ids.insert(entry.id.clone());
139 loops.push(LoopRecord {
140 id: entry.id,
141 status: status.to_string(),
142 location,
143 prompt: Some(entry.prompt),
144 merge_commit: None,
145 });
146 }
147
148 for entry in merge_queue
149 .list()
150 .map_err(|error| ApiError::internal(format!("failed reading merge queue: {error}")))?
151 {
152 if listed_ids.contains(&entry.loop_id) {
153 continue;
154 }
155
156 let status = match entry.state {
157 MergeState::Queued => "queued",
158 MergeState::Merging => "merging",
159 MergeState::Merged => "merged",
160 MergeState::NeedsReview => "needs-review",
161 MergeState::Discarded => "discarded",
162 };
163
164 loops.push(LoopRecord {
165 id: entry.loop_id,
166 status: status.to_string(),
167 location: entry
168 .merge_commit
169 .clone()
170 .unwrap_or_else(|| "-".to_string()),
171 prompt: Some(entry.prompt),
172 merge_commit: entry.merge_commit,
173 });
174 }
175
176 if !include_terminal {
177 loops.retain(|loop_info| !matches!(loop_info.status.as_str(), "merged" | "discarded"));
178 }
179
180 Ok(loops)
181 }
182 pub fn status(&self) -> LoopStatusResult {
183 let running = LoopLock::is_locked(&self.workspace_root).unwrap_or(false);
184 LoopStatusResult {
185 running,
186 interval_ms: self.process_interval_ms,
187 last_processed_at: self.last_processed_at.clone(),
188 }
189 }
190 pub fn process(&mut self) -> Result<(), ApiError> {
191 let queue = MergeQueue::new(&self.workspace_root);
192 let pending_entries = queue
193 .list_by_state(MergeState::Queued)
194 .map_err(map_merge_error)?;
195
196 if pending_entries.is_empty() {
197 self.last_processed_at = Some(now_ts());
198 return Ok(());
199 }
200
201 let status = Command::new(&self.ralph_command)
202 .args(["loops", "process"])
203 .current_dir(&self.workspace_root)
204 .status()
205 .map_err(|error| {
206 ApiError::internal(format!(
207 "failed invoking '{}' for loop.process: {error}",
208 self.ralph_command
209 ))
210 })?;
211
212 if !status.success() {
213 return Err(ApiError::internal(format!(
214 "loop.process command '{}' exited with status {status}",
215 self.ralph_command
216 )));
217 }
218
219 self.last_processed_at = Some(now_ts());
220 Ok(())
221 }
222 pub fn prune(&self) -> Result<(), ApiError> {
223 let registry = LoopRegistry::new(&self.workspace_root);
224 registry
225 .clean_stale()
226 .map_err(|error| ApiError::internal(format!("failed pruning stale loops: {error}")))?;
227 Ok(())
228 }
229 pub fn retry(&self, params: LoopRetryParams) -> Result<(), ApiError> {
230 if let Some(steering_input) = params.steering_input
231 && !steering_input.trim().is_empty()
232 {
233 let steering_path = self.workspace_root.join(".ralph/merge-steering.txt");
234 if let Some(parent) = steering_path.parent() {
235 fs::create_dir_all(parent).map_err(|error| {
236 ApiError::internal(format!(
237 "failed creating merge steering directory '{}': {error}",
238 parent.display()
239 ))
240 })?;
241 }
242 fs::write(&steering_path, steering_input.trim()).map_err(|error| {
243 ApiError::internal(format!(
244 "failed writing merge steering file '{}': {error}",
245 steering_path.display()
246 ))
247 })?;
248 }
249
250 let queue = MergeQueue::new(&self.workspace_root);
251 let entry = queue
252 .get_entry(¶ms.id)
253 .map_err(map_merge_error)?
254 .ok_or_else(|| loop_not_found_error(¶ms.id))?;
255
256 if entry.state != MergeState::NeedsReview {
257 return Err(ApiError::precondition_failed(format!(
258 "Loop '{}' is in state {:?}, can only retry 'needs-review' loops",
259 params.id, entry.state
260 )));
261 }
262
263 spawn_retry_merge_flow(&self.workspace_root, &self.ralph_command, ¶ms.id)
264 }
265
266 pub fn discard(&self, id: &str) -> Result<(), ApiError> {
267 let resolved = resolve_discard_target(&self.workspace_root, id)?;
268 let queue = MergeQueue::new(&self.workspace_root);
269 let registry = LoopRegistry::new(&self.workspace_root);
270
271 if queue
272 .get_entry(&resolved.id)
273 .map_err(map_merge_error)?
274 .is_some()
275 {
276 queue
277 .discard(&resolved.id, Some("User requested discard"))
278 .map_err(map_merge_error)?;
279 }
280
281 match registry.deregister(&resolved.id) {
282 Ok(()) | Err(RegistryError::NotFound(_)) => {}
283 Err(error) => {
284 return Err(ApiError::internal(format!(
285 "failed deregistering loop '{}': {error}",
286 resolved.id
287 )));
288 }
289 }
290
291 if let Some(worktree_path) = resolved.worktree_path {
292 remove_worktree(&self.workspace_root, &worktree_path)
293 .map_err(|error| map_worktree_error(&resolved.id, error))?;
294 }
295
296 Ok(())
297 }
298 pub fn stop(&self, params: LoopStopMergeParams) -> Result<(), ApiError> {
299 let target_root = resolve_loop_root(&self.workspace_root, ¶ms.id)?;
300 let lock_metadata = LoopLock::read_existing(&target_root)
301 .map_err(|error| ApiError::internal(format!("failed reading loop lock: {error}")))?
302 .ok_or_else(|| loop_not_found_error(¶ms.id))?;
303
304 if params.force.unwrap_or(false) {
305 if !is_pid_alive(lock_metadata.pid) {
306 return Err(ApiError::precondition_failed(format!(
307 "Loop '{}' is not running (process {} not found)",
308 params.id, lock_metadata.pid
309 )));
310 }
311
312 let status = Command::new("kill")
313 .args(["-9", &lock_metadata.pid.to_string()])
314 .status()
315 .map_err(|error| {
316 ApiError::internal(format!(
317 "failed sending force stop signal to process {}: {error}",
318 lock_metadata.pid
319 ))
320 })?;
321
322 if !status.success() {
323 return Err(ApiError::internal(format!(
324 "failed force-stopping process {}",
325 lock_metadata.pid
326 )));
327 }
328
329 return Ok(());
330 }
331
332 let stop_path = target_root.join(".ralph/stop-requested");
333 if let Some(parent) = stop_path.parent() {
334 fs::create_dir_all(parent).map_err(|error| {
335 ApiError::internal(format!(
336 "failed creating stop marker directory '{}': {error}",
337 parent.display()
338 ))
339 })?;
340 }
341
342 fs::write(&stop_path, "").map_err(|error| {
343 ApiError::internal(format!(
344 "failed writing stop marker '{}': {error}",
345 stop_path.display()
346 ))
347 })?;
348
349 Ok(())
350 }
351 pub fn merge(&self, params: LoopStopMergeParams) -> Result<(), ApiError> {
352 let queue = MergeQueue::new(&self.workspace_root);
353 let entry = queue
354 .get_entry(¶ms.id)
355 .map_err(map_merge_error)?
356 .ok_or_else(|| loop_not_found_error(¶ms.id))?;
357
358 match entry.state {
359 MergeState::Merged => {
360 return Err(ApiError::precondition_failed(format!(
361 "Loop '{}' is already merged",
362 params.id
363 )));
364 }
365 MergeState::Discarded => {
366 return Err(ApiError::precondition_failed(format!(
367 "Loop '{}' is discarded",
368 params.id
369 )));
370 }
371 MergeState::Merging if !params.force.unwrap_or(false) => {
372 return Err(ApiError::precondition_failed(format!(
373 "Loop '{}' is currently merging. Use force=true to override.",
374 params.id
375 )));
376 }
377 _ => {}
378 }
379
380 if entry.state != MergeState::Merging {
381 queue
382 .mark_merging(¶ms.id, std::process::id())
383 .map_err(map_merge_error)?;
384 }
385
386 queue
387 .mark_merged(¶ms.id, ¤t_commit(&self.workspace_root))
388 .map_err(map_merge_error)
389 }
390 pub fn merge_button_state(&self, id: &str) -> Result<MergeButtonStateResult, ApiError> {
391 match merge_button_state(&self.workspace_root, id).map_err(map_merge_error)? {
392 MergeButtonState::Active => Ok(MergeButtonStateResult {
393 enabled: true,
394 reason: None,
395 action: Some("merge".to_string()),
396 }),
397 MergeButtonState::Blocked { reason } => Ok(MergeButtonStateResult {
398 enabled: false,
399 reason: Some(reason),
400 action: Some("wait".to_string()),
401 }),
402 }
403 }
404 pub fn trigger_merge_task(
405 &self,
406 params: LoopTriggerMergeTaskParams,
407 tasks: &mut TaskDomain,
408 ) -> Result<TriggerMergeTaskResult, ApiError> {
409 let loop_info = self
410 .list(LoopListParams {
411 include_terminal: Some(true),
412 })?
413 .into_iter()
414 .find(|loop_info| loop_info.id == params.loop_id)
415 .ok_or_else(|| loop_not_found_error(¶ms.loop_id))?;
416
417 if loop_info.location == "(in-place)" {
418 return Err(ApiError::invalid_params(
419 "Cannot trigger merge for in-place loop (primary)",
420 ));
421 }
422
423 let loop_prompt = loop_info
424 .prompt
425 .clone()
426 .unwrap_or_else(|| "(no prompt recorded)".to_string());
427
428 let merge_prompt = format!(
429 "Merge worktree loop '{}' into main branch.\n\nThe worktree is located at: {}\nOriginal task: {}\n\nInstructions:\n1. Review the commits in the worktree branch\n2. Merge the changes into main branch\n3. Resolve any conflicts if present\n4. Delete the worktree after successful merge",
430 params.loop_id, loop_info.location, loop_prompt
431 );
432
433 let task_id = format!("merge-{}-{}", params.loop_id, Utc::now().timestamp_millis());
434 let task = tasks.create(TaskCreateParams {
435 id: task_id,
436 title: format!(
437 "Merge: {}",
438 loop_info
439 .prompt
440 .unwrap_or_else(|| params.loop_id.clone())
441 .chars()
442 .take(50)
443 .collect::<String>()
444 ),
445 status: Some("open".to_string()),
446 priority: Some(1),
447 blocked_by: None,
448 auto_execute: Some(true),
449 merge_loop_prompt: Some(merge_prompt),
450 })?;
451
452 Ok(TriggerMergeTaskResult {
453 success: true,
454 task_id: task.id,
455 queued_task_id: task.queued_task_id,
456 })
457 }
458}