Skip to main content

ralph_api/
loop_domain.rs

1use std::collections::HashSet;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::Command;
5
6use chrono::Utc;
7use ralph_core::{
8    LoopLock, LoopRegistry, MergeButtonState, MergeQueue, MergeState, RegistryError,
9    merge_button_state, remove_worktree,
10};
11use serde::{Deserialize, Serialize};
12
13use crate::errors::ApiError;
14use crate::loop_side_effects::{resolve_discard_target, resolve_loop_root, spawn_retry_merge_flow};
15use crate::loop_support::{
16    current_commit, is_pid_alive, loop_not_found_error, map_merge_error, map_worktree_error, now_ts,
17};
18use crate::task_domain::{TaskCreateParams, TaskDomain};
19
20#[derive(Debug, Clone, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct LoopListParams {
23    pub include_terminal: Option<bool>,
24}
25#[derive(Debug, Clone, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct LoopRetryParams {
28    pub id: String,
29    pub steering_input: Option<String>,
30}
31#[derive(Debug, Clone, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct LoopStopMergeParams {
34    pub id: String,
35    pub force: Option<bool>,
36}
37#[derive(Debug, Clone, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct LoopTriggerMergeTaskParams {
40    pub loop_id: String,
41}
42#[derive(Debug, Clone, Serialize)]
43#[serde(rename_all = "camelCase")]
44pub struct LoopRecord {
45    pub id: String,
46    pub status: String,
47    pub location: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub prompt: Option<String>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub merge_commit: Option<String>,
52}
53#[derive(Debug, Clone, Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct LoopStatusResult {
56    pub running: bool,
57    pub interval_ms: u64,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub last_processed_at: Option<String>,
60}
61#[derive(Debug, Clone, Serialize)]
62#[serde(rename_all = "camelCase")]
63pub struct MergeButtonStateResult {
64    pub enabled: bool,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub reason: Option<String>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub action: Option<String>,
69}
70#[derive(Debug, Clone, Serialize)]
71#[serde(rename_all = "camelCase")]
72pub struct TriggerMergeTaskResult {
73    pub success: bool,
74    pub task_id: String,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub queued_task_id: Option<String>,
77}
78
79pub struct LoopDomain {
80    workspace_root: PathBuf,
81    process_interval_ms: u64,
82    ralph_command: String,
83    last_processed_at: Option<String>,
84}
85
86impl LoopDomain {
87    pub fn new(
88        workspace_root: impl AsRef<Path>,
89        process_interval_ms: u64,
90        ralph_command: impl Into<String>,
91    ) -> Self {
92        Self {
93            workspace_root: workspace_root.as_ref().to_path_buf(),
94            process_interval_ms,
95            ralph_command: ralph_command.into(),
96            last_processed_at: None,
97        }
98    }
99    pub fn list(&self, params: LoopListParams) -> Result<Vec<LoopRecord>, ApiError> {
100        let include_terminal = params.include_terminal.unwrap_or(false);
101        let registry = LoopRegistry::new(&self.workspace_root);
102        let merge_queue = MergeQueue::new(&self.workspace_root);
103
104        let mut loops = Vec::new();
105        let mut listed_ids = HashSet::new();
106
107        if let Ok(Some(metadata)) = LoopLock::read_existing(&self.workspace_root)
108            && is_pid_alive(metadata.pid)
109        {
110            loops.push(LoopRecord {
111                id: "(primary)".to_string(),
112                status: "running".to_string(),
113                location: "(in-place)".to_string(),
114                prompt: Some(metadata.prompt),
115                merge_commit: None,
116            });
117            listed_ids.insert("(primary)".to_string());
118        }
119
120        let registry_entries = registry
121            .list()
122            .map_err(|error| ApiError::internal(format!("failed listing loops: {error}")))?;
123
124        for entry in registry_entries {
125            let status = if entry.is_alive() {
126                "running"
127            } else if entry.is_pid_alive() {
128                "orphan"
129            } else {
130                "crashed"
131            };
132
133            let location = entry
134                .worktree_path
135                .clone()
136                .unwrap_or_else(|| "(in-place)".to_string());
137
138            listed_ids.insert(entry.id.clone());
139            loops.push(LoopRecord {
140                id: entry.id,
141                status: status.to_string(),
142                location,
143                prompt: Some(entry.prompt),
144                merge_commit: None,
145            });
146        }
147
148        for entry in merge_queue
149            .list()
150            .map_err(|error| ApiError::internal(format!("failed reading merge queue: {error}")))?
151        {
152            if listed_ids.contains(&entry.loop_id) {
153                continue;
154            }
155
156            let status = match entry.state {
157                MergeState::Queued => "queued",
158                MergeState::Merging => "merging",
159                MergeState::Merged => "merged",
160                MergeState::NeedsReview => "needs-review",
161                MergeState::Discarded => "discarded",
162            };
163
164            loops.push(LoopRecord {
165                id: entry.loop_id,
166                status: status.to_string(),
167                location: entry
168                    .merge_commit
169                    .clone()
170                    .unwrap_or_else(|| "-".to_string()),
171                prompt: Some(entry.prompt),
172                merge_commit: entry.merge_commit,
173            });
174        }
175
176        if !include_terminal {
177            loops.retain(|loop_info| !matches!(loop_info.status.as_str(), "merged" | "discarded"));
178        }
179
180        Ok(loops)
181    }
182    pub fn status(&self) -> LoopStatusResult {
183        let running = LoopLock::is_locked(&self.workspace_root).unwrap_or(false);
184        LoopStatusResult {
185            running,
186            interval_ms: self.process_interval_ms,
187            last_processed_at: self.last_processed_at.clone(),
188        }
189    }
190    pub fn process(&mut self) -> Result<(), ApiError> {
191        let queue = MergeQueue::new(&self.workspace_root);
192        let pending_entries = queue
193            .list_by_state(MergeState::Queued)
194            .map_err(map_merge_error)?;
195
196        if pending_entries.is_empty() {
197            self.last_processed_at = Some(now_ts());
198            return Ok(());
199        }
200
201        let status = Command::new(&self.ralph_command)
202            .args(["loops", "process"])
203            .current_dir(&self.workspace_root)
204            .status()
205            .map_err(|error| {
206                ApiError::internal(format!(
207                    "failed invoking '{}' for loop.process: {error}",
208                    self.ralph_command
209                ))
210            })?;
211
212        if !status.success() {
213            return Err(ApiError::internal(format!(
214                "loop.process command '{}' exited with status {status}",
215                self.ralph_command
216            )));
217        }
218
219        self.last_processed_at = Some(now_ts());
220        Ok(())
221    }
222    pub fn prune(&self) -> Result<(), ApiError> {
223        let registry = LoopRegistry::new(&self.workspace_root);
224        registry
225            .clean_stale()
226            .map_err(|error| ApiError::internal(format!("failed pruning stale loops: {error}")))?;
227        Ok(())
228    }
229    pub fn retry(&self, params: LoopRetryParams) -> Result<(), ApiError> {
230        if let Some(steering_input) = params.steering_input
231            && !steering_input.trim().is_empty()
232        {
233            let steering_path = self.workspace_root.join(".ralph/merge-steering.txt");
234            if let Some(parent) = steering_path.parent() {
235                fs::create_dir_all(parent).map_err(|error| {
236                    ApiError::internal(format!(
237                        "failed creating merge steering directory '{}': {error}",
238                        parent.display()
239                    ))
240                })?;
241            }
242            fs::write(&steering_path, steering_input.trim()).map_err(|error| {
243                ApiError::internal(format!(
244                    "failed writing merge steering file '{}': {error}",
245                    steering_path.display()
246                ))
247            })?;
248        }
249
250        let queue = MergeQueue::new(&self.workspace_root);
251        let entry = queue
252            .get_entry(&params.id)
253            .map_err(map_merge_error)?
254            .ok_or_else(|| loop_not_found_error(&params.id))?;
255
256        if entry.state != MergeState::NeedsReview {
257            return Err(ApiError::precondition_failed(format!(
258                "Loop '{}' is in state {:?}, can only retry 'needs-review' loops",
259                params.id, entry.state
260            )));
261        }
262
263        spawn_retry_merge_flow(&self.workspace_root, &self.ralph_command, &params.id)
264    }
265
266    pub fn discard(&self, id: &str) -> Result<(), ApiError> {
267        let resolved = resolve_discard_target(&self.workspace_root, id)?;
268        let queue = MergeQueue::new(&self.workspace_root);
269        let registry = LoopRegistry::new(&self.workspace_root);
270
271        if queue
272            .get_entry(&resolved.id)
273            .map_err(map_merge_error)?
274            .is_some()
275        {
276            queue
277                .discard(&resolved.id, Some("User requested discard"))
278                .map_err(map_merge_error)?;
279        }
280
281        match registry.deregister(&resolved.id) {
282            Ok(()) | Err(RegistryError::NotFound(_)) => {}
283            Err(error) => {
284                return Err(ApiError::internal(format!(
285                    "failed deregistering loop '{}': {error}",
286                    resolved.id
287                )));
288            }
289        }
290
291        if let Some(worktree_path) = resolved.worktree_path {
292            remove_worktree(&self.workspace_root, &worktree_path)
293                .map_err(|error| map_worktree_error(&resolved.id, error))?;
294        }
295
296        Ok(())
297    }
298    pub fn stop(&self, params: LoopStopMergeParams) -> Result<(), ApiError> {
299        let target_root = resolve_loop_root(&self.workspace_root, &params.id)?;
300        let lock_metadata = LoopLock::read_existing(&target_root)
301            .map_err(|error| ApiError::internal(format!("failed reading loop lock: {error}")))?
302            .ok_or_else(|| loop_not_found_error(&params.id))?;
303
304        if params.force.unwrap_or(false) {
305            if !is_pid_alive(lock_metadata.pid) {
306                return Err(ApiError::precondition_failed(format!(
307                    "Loop '{}' is not running (process {} not found)",
308                    params.id, lock_metadata.pid
309                )));
310            }
311
312            let status = Command::new("kill")
313                .args(["-9", &lock_metadata.pid.to_string()])
314                .status()
315                .map_err(|error| {
316                    ApiError::internal(format!(
317                        "failed sending force stop signal to process {}: {error}",
318                        lock_metadata.pid
319                    ))
320                })?;
321
322            if !status.success() {
323                return Err(ApiError::internal(format!(
324                    "failed force-stopping process {}",
325                    lock_metadata.pid
326                )));
327            }
328
329            return Ok(());
330        }
331
332        let stop_path = target_root.join(".ralph/stop-requested");
333        if let Some(parent) = stop_path.parent() {
334            fs::create_dir_all(parent).map_err(|error| {
335                ApiError::internal(format!(
336                    "failed creating stop marker directory '{}': {error}",
337                    parent.display()
338                ))
339            })?;
340        }
341
342        fs::write(&stop_path, "").map_err(|error| {
343            ApiError::internal(format!(
344                "failed writing stop marker '{}': {error}",
345                stop_path.display()
346            ))
347        })?;
348
349        Ok(())
350    }
351    pub fn merge(&self, params: LoopStopMergeParams) -> Result<(), ApiError> {
352        let queue = MergeQueue::new(&self.workspace_root);
353        let entry = queue
354            .get_entry(&params.id)
355            .map_err(map_merge_error)?
356            .ok_or_else(|| loop_not_found_error(&params.id))?;
357
358        match entry.state {
359            MergeState::Merged => {
360                return Err(ApiError::precondition_failed(format!(
361                    "Loop '{}' is already merged",
362                    params.id
363                )));
364            }
365            MergeState::Discarded => {
366                return Err(ApiError::precondition_failed(format!(
367                    "Loop '{}' is discarded",
368                    params.id
369                )));
370            }
371            MergeState::Merging if !params.force.unwrap_or(false) => {
372                return Err(ApiError::precondition_failed(format!(
373                    "Loop '{}' is currently merging. Use force=true to override.",
374                    params.id
375                )));
376            }
377            _ => {}
378        }
379
380        if entry.state != MergeState::Merging {
381            queue
382                .mark_merging(&params.id, std::process::id())
383                .map_err(map_merge_error)?;
384        }
385
386        queue
387            .mark_merged(&params.id, &current_commit(&self.workspace_root))
388            .map_err(map_merge_error)
389    }
390    pub fn merge_button_state(&self, id: &str) -> Result<MergeButtonStateResult, ApiError> {
391        match merge_button_state(&self.workspace_root, id).map_err(map_merge_error)? {
392            MergeButtonState::Active => Ok(MergeButtonStateResult {
393                enabled: true,
394                reason: None,
395                action: Some("merge".to_string()),
396            }),
397            MergeButtonState::Blocked { reason } => Ok(MergeButtonStateResult {
398                enabled: false,
399                reason: Some(reason),
400                action: Some("wait".to_string()),
401            }),
402        }
403    }
404    pub fn trigger_merge_task(
405        &self,
406        params: LoopTriggerMergeTaskParams,
407        tasks: &mut TaskDomain,
408    ) -> Result<TriggerMergeTaskResult, ApiError> {
409        let loop_info = self
410            .list(LoopListParams {
411                include_terminal: Some(true),
412            })?
413            .into_iter()
414            .find(|loop_info| loop_info.id == params.loop_id)
415            .ok_or_else(|| loop_not_found_error(&params.loop_id))?;
416
417        if loop_info.location == "(in-place)" {
418            return Err(ApiError::invalid_params(
419                "Cannot trigger merge for in-place loop (primary)",
420            ));
421        }
422
423        let loop_prompt = loop_info
424            .prompt
425            .clone()
426            .unwrap_or_else(|| "(no prompt recorded)".to_string());
427
428        let merge_prompt = format!(
429            "Merge worktree loop '{}' into main branch.\n\nThe worktree is located at: {}\nOriginal task: {}\n\nInstructions:\n1. Review the commits in the worktree branch\n2. Merge the changes into main branch\n3. Resolve any conflicts if present\n4. Delete the worktree after successful merge",
430            params.loop_id, loop_info.location, loop_prompt
431        );
432
433        let task_id = format!("merge-{}-{}", params.loop_id, Utc::now().timestamp_millis());
434        let task = tasks.create(TaskCreateParams {
435            id: task_id,
436            title: format!(
437                "Merge: {}",
438                loop_info
439                    .prompt
440                    .unwrap_or_else(|| params.loop_id.clone())
441                    .chars()
442                    .take(50)
443                    .collect::<String>()
444            ),
445            status: Some("open".to_string()),
446            priority: Some(1),
447            blocked_by: None,
448            auto_execute: Some(true),
449            merge_loop_prompt: Some(merge_prompt),
450        })?;
451
452        Ok(TriggerMergeTaskResult {
453            success: true,
454            task_id: task.id,
455            queued_task_id: task.queued_task_id,
456        })
457    }
458}