Skip to main content

ralph_core/
merge_queue.rs

1//! Merge queue for tracking parallel loop merges.
2//!
3//! The merge queue maintains an append-only log of merge events, tracking
4//! loops from completion through successful merge or failure. It uses JSONL
5//! format for durability and easy debugging.
6//!
7//! # Design
8//!
9//! - **JSONL persistence**: Append-only log at `.ralph/merge-queue.jsonl`
10//! - **File locking**: Uses `flock()` for concurrent access safety
11//! - **Event sourcing**: State is derived from event history
12//!
13//! # Example
14//!
15//! ```no_run
16//! use ralph_core::merge_queue::{MergeQueue, MergeQueueError};
17//!
18//! fn main() -> Result<(), MergeQueueError> {
19//!     let queue = MergeQueue::new(".");
20//!
21//!     // Queue a completed loop for merge
22//!     queue.enqueue("ralph-20250124-a3f2", "implement auth")?;
23//!
24//!     // Get next pending loop
25//!     if let Some(entry) = queue.next_pending()? {
26//!         // Mark as merging
27//!         queue.mark_merging(&entry.loop_id, std::process::id())?;
28//!
29//!         // ... perform merge ...
30//!
31//!         // Mark result
32//!         queue.mark_merged(&entry.loop_id, "abc123def")?;
33//!     }
34//!
35//!     Ok(())
36//! }
37//! ```
38
39use crate::loop_lock::LoopLock;
40use crate::text::truncate_with_ellipsis;
41use chrono::{DateTime, Utc};
42use serde::{Deserialize, Serialize};
43use std::fs::{self, File, OpenOptions};
44use std::io::{self, BufRead, BufReader, Seek, SeekFrom, Write};
45use std::path::{Path, PathBuf};
46use std::process::Command;
47
48/// A merge queue event recorded in the JSONL log.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct MergeEvent {
51    /// Timestamp of the event.
52    pub ts: DateTime<Utc>,
53
54    /// Loop ID this event relates to.
55    pub loop_id: String,
56
57    /// Type of event.
58    pub event: MergeEventType,
59}
60
61/// Types of merge events.
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(tag = "type", rename_all = "snake_case")]
64pub enum MergeEventType {
65    /// Loop has been queued for merge.
66    Queued {
67        /// The prompt that was executed in this loop.
68        prompt: String,
69    },
70
71    /// Merge operation has started.
72    Merging {
73        /// PID of the merge-ralph process.
74        pid: u32,
75    },
76
77    /// Merge completed successfully.
78    Merged {
79        /// The commit SHA of the merge commit.
80        commit: String,
81    },
82
83    /// Merge failed and needs manual review.
84    NeedsReview {
85        /// Reason for the failure.
86        reason: String,
87    },
88
89    /// Loop was manually discarded.
90    Discarded {
91        /// Reason for discarding (optional).
92        reason: Option<String>,
93    },
94}
95
96/// State of the merge button for a loop.
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum MergeButtonState {
99    /// Merge button is active (can merge now).
100    Active,
101    /// Merge button is blocked with a reason.
102    Blocked { reason: String },
103}
104
105/// Decision about whether a merge needs user steering.
106#[derive(Debug, Clone)]
107pub struct SteeringDecision {
108    /// Whether user input is needed.
109    pub needs_input: bool,
110    /// Reason for needing input (or empty if not needed).
111    pub reason: String,
112    /// Options for the user to choose from.
113    pub options: Vec<MergeOption>,
114}
115
116/// An option for merge steering.
117#[derive(Debug, Clone)]
118pub struct MergeOption {
119    /// Label for this option.
120    pub label: String,
121}
122
123/// Current state of a loop in the merge queue.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum MergeState {
126    /// Waiting to be merged.
127    Queued,
128    /// Currently being merged.
129    Merging,
130    /// Successfully merged.
131    Merged,
132    /// Needs manual review.
133    NeedsReview,
134    /// Discarded by user.
135    Discarded,
136}
137
138impl MergeState {
139    /// Returns true if this is a terminal state (no further transitions possible).
140    ///
141    /// Terminal states (`Merged`, `Discarded`) represent completed loops that
142    /// no longer need user attention and can be filtered from UI displays.
143    pub fn is_terminal(self) -> bool {
144        matches!(self, Self::Merged | Self::Discarded)
145    }
146}
147
148/// Summary of a loop's merge status.
149#[derive(Debug, Clone)]
150pub struct MergeEntry {
151    /// Loop ID.
152    pub loop_id: String,
153
154    /// Original prompt.
155    pub prompt: String,
156
157    /// Current state.
158    pub state: MergeState,
159
160    /// When the loop was queued.
161    pub queued_at: DateTime<Utc>,
162
163    /// PID of merge-ralph if merging.
164    pub merge_pid: Option<u32>,
165
166    /// Merge commit SHA if merged.
167    pub merge_commit: Option<String>,
168
169    /// Failure reason if needs_review.
170    pub failure_reason: Option<String>,
171
172    /// Discard reason if discarded.
173    pub discard_reason: Option<String>,
174}
175
176/// Errors that can occur during merge queue operations.
177#[derive(Debug, thiserror::Error)]
178pub enum MergeQueueError {
179    /// IO error during queue operations.
180    #[error("IO error: {0}")]
181    Io(#[from] io::Error),
182
183    /// Failed to parse queue data.
184    #[error("Failed to parse merge queue: {0}")]
185    ParseError(String),
186
187    /// Loop entry not found.
188    #[error("Loop not found in queue: {0}")]
189    NotFound(String),
190
191    /// Invalid state transition.
192    #[error("Invalid state transition for {0}: cannot transition from {1:?} to {2:?}")]
193    InvalidTransition(String, MergeState, MergeState),
194
195    /// Platform not supported.
196    #[error("File locking not supported on this platform")]
197    UnsupportedPlatform,
198}
199
200/// Merge queue for tracking parallel loop merges.
201///
202/// The queue maintains an append-only JSONL log of merge events.
203/// State is derived by replaying events for each loop.
204pub struct MergeQueue {
205    /// Path to the merge queue file.
206    queue_path: PathBuf,
207}
208
209impl MergeQueue {
210    /// The relative path to the merge queue file within the workspace.
211    pub const QUEUE_FILE: &'static str = ".ralph/merge-queue.jsonl";
212
213    /// Creates a new merge queue instance for the given workspace.
214    pub fn new(workspace_root: impl AsRef<Path>) -> Self {
215        Self {
216            queue_path: workspace_root.as_ref().join(Self::QUEUE_FILE),
217        }
218    }
219
220    /// Enqueues a completed loop for merging.
221    ///
222    /// # Arguments
223    ///
224    /// * `loop_id` - The loop identifier
225    /// * `prompt` - The prompt that was executed
226    pub fn enqueue(&self, loop_id: &str, prompt: &str) -> Result<(), MergeQueueError> {
227        let event = MergeEvent {
228            ts: Utc::now(),
229            loop_id: loop_id.to_string(),
230            event: MergeEventType::Queued {
231                prompt: prompt.to_string(),
232            },
233        };
234        self.append_event(&event)
235    }
236
237    /// Marks a loop as being merged.
238    ///
239    /// # Arguments
240    ///
241    /// * `loop_id` - The loop identifier
242    /// * `pid` - PID of the merge-ralph process
243    pub fn mark_merging(&self, loop_id: &str, pid: u32) -> Result<(), MergeQueueError> {
244        // Verify loop is in queued or needs_review state
245        let entry = self.get_entry(loop_id)?;
246        match entry {
247            Some(e) if e.state == MergeState::Queued || e.state == MergeState::NeedsReview => {}
248            Some(e) => {
249                return Err(MergeQueueError::InvalidTransition(
250                    loop_id.to_string(),
251                    e.state,
252                    MergeState::Merging,
253                ));
254            }
255            None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
256        }
257
258        let event = MergeEvent {
259            ts: Utc::now(),
260            loop_id: loop_id.to_string(),
261            event: MergeEventType::Merging { pid },
262        };
263        self.append_event(&event)
264    }
265
266    /// Marks a loop as successfully merged.
267    ///
268    /// # Arguments
269    ///
270    /// * `loop_id` - The loop identifier
271    /// * `commit` - The merge commit SHA
272    pub fn mark_merged(&self, loop_id: &str, commit: &str) -> Result<(), MergeQueueError> {
273        // Verify loop is in merging state
274        let entry = self.get_entry(loop_id)?;
275        match entry {
276            Some(e) if e.state == MergeState::Merging => {}
277            Some(e) => {
278                return Err(MergeQueueError::InvalidTransition(
279                    loop_id.to_string(),
280                    e.state,
281                    MergeState::Merged,
282                ));
283            }
284            None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
285        }
286
287        let event = MergeEvent {
288            ts: Utc::now(),
289            loop_id: loop_id.to_string(),
290            event: MergeEventType::Merged {
291                commit: commit.to_string(),
292            },
293        };
294        self.append_event(&event)
295    }
296
297    /// Marks a loop as needing manual review.
298    ///
299    /// # Arguments
300    ///
301    /// * `loop_id` - The loop identifier
302    /// * `reason` - Reason for the failure
303    pub fn mark_needs_review(&self, loop_id: &str, reason: &str) -> Result<(), MergeQueueError> {
304        // Verify loop is in merging state
305        let entry = self.get_entry(loop_id)?;
306        match entry {
307            Some(e) if e.state == MergeState::Merging => {}
308            Some(e) => {
309                return Err(MergeQueueError::InvalidTransition(
310                    loop_id.to_string(),
311                    e.state,
312                    MergeState::NeedsReview,
313                ));
314            }
315            None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
316        }
317
318        let event = MergeEvent {
319            ts: Utc::now(),
320            loop_id: loop_id.to_string(),
321            event: MergeEventType::NeedsReview {
322                reason: reason.to_string(),
323            },
324        };
325        self.append_event(&event)
326    }
327
328    /// Marks a loop as discarded.
329    ///
330    /// # Arguments
331    ///
332    /// * `loop_id` - The loop identifier
333    /// * `reason` - Optional reason for discarding
334    pub fn discard(&self, loop_id: &str, reason: Option<&str>) -> Result<(), MergeQueueError> {
335        // Can discard from queued or needs_review states
336        let entry = self.get_entry(loop_id)?;
337        match entry {
338            Some(e) if e.state == MergeState::Queued || e.state == MergeState::NeedsReview => {}
339            Some(e) => {
340                return Err(MergeQueueError::InvalidTransition(
341                    loop_id.to_string(),
342                    e.state,
343                    MergeState::Discarded,
344                ));
345            }
346            None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
347        }
348
349        let event = MergeEvent {
350            ts: Utc::now(),
351            loop_id: loop_id.to_string(),
352            event: MergeEventType::Discarded {
353                reason: reason.map(String::from),
354            },
355        };
356        self.append_event(&event)
357    }
358
359    /// Gets the next pending loop ready for merge (FIFO order).
360    ///
361    /// Returns the oldest loop in `Queued` state.
362    pub fn next_pending(&self) -> Result<Option<MergeEntry>, MergeQueueError> {
363        let entries = self.list()?;
364        Ok(entries.into_iter().find(|e| e.state == MergeState::Queued))
365    }
366
367    /// Gets the entry for a specific loop.
368    pub fn get_entry(&self, loop_id: &str) -> Result<Option<MergeEntry>, MergeQueueError> {
369        let entries = self.list()?;
370        Ok(entries.into_iter().find(|e| e.loop_id == loop_id))
371    }
372
373    /// Lists all entries in the merge queue.
374    ///
375    /// Returns entries in chronological order (oldest first).
376    pub fn list(&self) -> Result<Vec<MergeEntry>, MergeQueueError> {
377        let events = self.read_all_events()?;
378        Ok(Self::derive_state(&events))
379    }
380
381    /// Lists entries filtered by state.
382    pub fn list_by_state(&self, state: MergeState) -> Result<Vec<MergeEntry>, MergeQueueError> {
383        let entries = self.list()?;
384        Ok(entries.into_iter().filter(|e| e.state == state).collect())
385    }
386
387    /// Reads all events from the queue file.
388    fn read_all_events(&self) -> Result<Vec<MergeEvent>, MergeQueueError> {
389        if !self.queue_path.exists() {
390            return Ok(Vec::new());
391        }
392
393        self.with_shared_lock(|file| {
394            let reader = BufReader::new(file);
395            let mut events = Vec::new();
396
397            for (line_num, line) in reader.lines().enumerate() {
398                let line = line?;
399                if line.trim().is_empty() {
400                    continue;
401                }
402
403                let event: MergeEvent = serde_json::from_str(&line).map_err(|e| {
404                    MergeQueueError::ParseError(format!("Line {}: {}", line_num + 1, e))
405                })?;
406                events.push(event);
407            }
408
409            Ok(events)
410        })
411    }
412
413    /// Derives the current state of all loops from the event history.
414    fn derive_state(events: &[MergeEvent]) -> Vec<MergeEntry> {
415        use std::collections::HashMap;
416
417        // Build up state for each loop
418        let mut loop_states: HashMap<String, MergeEntry> = HashMap::new();
419
420        for event in events {
421            let entry = loop_states
422                .entry(event.loop_id.clone())
423                .or_insert_with(|| MergeEntry {
424                    loop_id: event.loop_id.clone(),
425                    prompt: String::new(),
426                    state: MergeState::Queued,
427                    queued_at: event.ts,
428                    merge_pid: None,
429                    merge_commit: None,
430                    failure_reason: None,
431                    discard_reason: None,
432                });
433
434            match &event.event {
435                MergeEventType::Queued { prompt } => {
436                    entry.prompt = prompt.clone();
437                    entry.state = MergeState::Queued;
438                    entry.queued_at = event.ts;
439                }
440                MergeEventType::Merging { pid } => {
441                    entry.state = MergeState::Merging;
442                    entry.merge_pid = Some(*pid);
443                }
444                MergeEventType::Merged { commit } => {
445                    entry.state = MergeState::Merged;
446                    entry.merge_commit = Some(commit.clone());
447                }
448                MergeEventType::NeedsReview { reason } => {
449                    entry.state = MergeState::NeedsReview;
450                    entry.failure_reason = Some(reason.clone());
451                }
452                MergeEventType::Discarded { reason } => {
453                    entry.state = MergeState::Discarded;
454                    entry.discard_reason = reason.clone();
455                }
456            }
457        }
458
459        // Sort by queued_at to maintain FIFO order
460        let mut entries: Vec<_> = loop_states.into_values().collect();
461        entries.sort_by_key(|a| a.queued_at);
462        entries
463    }
464
465    /// Appends an event to the queue file.
466    fn append_event(&self, event: &MergeEvent) -> Result<(), MergeQueueError> {
467        self.with_exclusive_lock(|mut file| {
468            // Seek to end
469            file.seek(SeekFrom::End(0))?;
470
471            // Write event as JSON line
472            let json = serde_json::to_string(event)
473                .map_err(|e| MergeQueueError::ParseError(e.to_string()))?;
474            writeln!(file, "{}", json)?;
475
476            file.sync_all()?;
477            Ok(())
478        })
479    }
480
481    /// Executes an operation with a shared (read) lock on the queue file.
482    #[cfg(unix)]
483    fn with_shared_lock<T, F>(&self, f: F) -> Result<T, MergeQueueError>
484    where
485        F: FnOnce(&File) -> Result<T, MergeQueueError>,
486    {
487        use nix::fcntl::{Flock, FlockArg};
488
489        let file = File::open(&self.queue_path)?;
490
491        // Acquire shared lock (blocking)
492        let flock = Flock::lock(file, FlockArg::LockShared).map_err(|(_, errno)| {
493            MergeQueueError::Io(io::Error::new(
494                io::ErrorKind::Other,
495                format!("flock failed: {}", errno),
496            ))
497        })?;
498
499        // Get a reference to the inner file
500        use std::os::fd::AsFd;
501        let borrowed_fd = flock.as_fd();
502        let owned_fd = borrowed_fd.try_clone_to_owned()?;
503        let file: File = owned_fd.into();
504
505        f(&file)
506    }
507
508    #[cfg(not(unix))]
509    fn with_shared_lock<T, F>(&self, _f: F) -> Result<T, MergeQueueError>
510    where
511        F: FnOnce(&File) -> Result<T, MergeQueueError>,
512    {
513        Err(MergeQueueError::UnsupportedPlatform)
514    }
515
516    /// Executes an operation with an exclusive (write) lock on the queue file.
517    #[cfg(unix)]
518    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T, MergeQueueError>
519    where
520        F: FnOnce(File) -> Result<T, MergeQueueError>,
521    {
522        use nix::fcntl::{Flock, FlockArg};
523
524        // Ensure .ralph directory exists
525        if let Some(parent) = self.queue_path.parent() {
526            fs::create_dir_all(parent)?;
527        }
528
529        // Open or create the file
530        let file = OpenOptions::new()
531            .read(true)
532            .write(true)
533            .create(true)
534            .truncate(false)
535            .open(&self.queue_path)?;
536
537        // Acquire exclusive lock (blocking)
538        let flock = Flock::lock(file, FlockArg::LockExclusive).map_err(|(_, errno)| {
539            MergeQueueError::Io(io::Error::new(
540                io::ErrorKind::Other,
541                format!("flock failed: {}", errno),
542            ))
543        })?;
544
545        // Get a clone of the underlying file
546        use std::os::fd::AsFd;
547        let borrowed_fd = flock.as_fd();
548        let owned_fd = borrowed_fd.try_clone_to_owned()?;
549        let file: File = owned_fd.into();
550
551        f(file)
552    }
553
554    #[cfg(not(unix))]
555    fn with_exclusive_lock<T, F>(&self, _f: F) -> Result<T, MergeQueueError>
556    where
557        F: FnOnce(File) -> Result<T, MergeQueueError>,
558    {
559        Err(MergeQueueError::UnsupportedPlatform)
560    }
561}
562
563/// Get the merge button state for a loop.
564///
565/// Determines whether the merge button should be active or blocked based on:
566/// - Whether the primary loop is running
567/// - Whether this loop is already being merged
568pub fn merge_button_state(
569    workspace: &Path,
570    loop_id: &str,
571) -> Result<MergeButtonState, MergeQueueError> {
572    let queue = MergeQueue::new(workspace);
573
574    // Check if this loop is already being merged
575    if let Some(entry) = queue.get_entry(loop_id)?
576        && entry.state == MergeState::Merging
577    {
578        return Ok(MergeButtonState::Blocked {
579            reason: "Merge already in progress".to_string(),
580        });
581    }
582
583    // Check if primary loop is running by checking:
584    // 1. Lock file exists
585    // 2. PID in the file is still alive
586    if let Ok(Some(metadata)) = LoopLock::read_existing(workspace) {
587        // Check if the PID is still running
588        if is_pid_alive(metadata.pid) {
589            return Ok(MergeButtonState::Blocked {
590                reason: format!("primary loop running: {}", metadata.prompt),
591            });
592        }
593    }
594
595    Ok(MergeButtonState::Active)
596}
597
598/// Check if a process with the given PID is still running.
599fn is_pid_alive(pid: u32) -> bool {
600    #[cfg(unix)]
601    {
602        use nix::sys::signal::kill;
603        use nix::unistd::Pid;
604        // Signal 0 (None) doesn't send any signal but checks if the process exists
605        kill(Pid::from_raw(pid as i32), None).is_ok()
606    }
607
608    #[cfg(not(unix))]
609    {
610        // On non-Unix, assume the process is alive if we can't check
611        true
612    }
613}
614
615/// Generate a smart merge summary from worktree commits.
616///
617/// Reads the commit history and generates a concise summary suitable for
618/// the merge commit message (single line, respects 72-char limit when combined
619/// with the loop ID prefix).
620pub fn smart_merge_summary(workspace: &Path, loop_id: &str) -> Result<String, MergeQueueError> {
621    let branch_name = format!("ralph/{}", loop_id);
622
623    // Get commit messages from the branch
624    let output = Command::new("git")
625        .args([
626            "log",
627            "--oneline",
628            "--no-walk=unsorted",
629            &format!("main..{}", branch_name),
630        ])
631        .current_dir(workspace)
632        .output()?;
633
634    let log_output = String::from_utf8_lossy(&output.stdout);
635    let lines: Vec<&str> = log_output.lines().collect();
636
637    // Extract the most meaningful commit message
638    let summary = if lines.is_empty() {
639        // Try getting any commit on the branch
640        let output = Command::new("git")
641            .args(["log", "-1", "--oneline", &branch_name])
642            .current_dir(workspace)
643            .output()?;
644
645        let msg = String::from_utf8_lossy(&output.stdout);
646        extract_summary_from_line(msg.trim())
647    } else {
648        // Use the most recent commit message
649        extract_summary_from_line(lines[0])
650    };
651
652    // Calculate max length: 72 - "merge(ralph): " (14) - " (loop {})" with loop_id
653    let prefix_len = 14; // "merge(ralph): "
654    let suffix_len = 8 + loop_id.len(); // " (loop " + loop_id + ")"
655    let max_summary_len = 72 - prefix_len - suffix_len;
656
657    // Truncate if needed
658    let summary = truncate_with_ellipsis(&summary, max_summary_len);
659
660    Ok(summary)
661}
662
663/// Extract summary from a git log --oneline line (removes commit hash prefix).
664fn extract_summary_from_line(line: &str) -> String {
665    // Format is "abc1234 commit message"
666    if let Some(idx) = line.find(' ') {
667        line[idx + 1..].to_string()
668    } else {
669        line.to_string()
670    }
671}
672
673/// Check if a merge needs user steering (e.g., due to conflicts).
674pub fn merge_needs_steering(
675    workspace: &Path,
676    loop_id: &str,
677) -> Result<SteeringDecision, MergeQueueError> {
678    let branch_name = format!("ralph/{}", loop_id);
679
680    // Check for potential conflicts by doing a dry-run merge
681    let output = Command::new("git")
682        .args(["merge-tree", "--write-tree", "main", &branch_name])
683        .current_dir(workspace)
684        .output()?;
685
686    // Check if merge-tree reports conflicts (non-zero exit or conflict markers in output)
687    let has_conflicts =
688        !output.status.success() || String::from_utf8_lossy(&output.stdout).contains("CONFLICT");
689
690    if has_conflicts {
691        // Also get list of conflicting files
692        let diff_output = Command::new("git")
693            .args(["diff", "--name-only", "main", &branch_name])
694            .current_dir(workspace)
695            .output()?;
696
697        let files = String::from_utf8_lossy(&diff_output.stdout);
698        let file_list: Vec<&str> = files.lines().take(3).collect();
699
700        let reason = if file_list.is_empty() {
701            "Potential conflict detected".to_string()
702        } else {
703            format!("Files modified on both branches: {}", file_list.join(", "))
704        };
705
706        Ok(SteeringDecision {
707            needs_input: true,
708            reason,
709            options: vec![
710                MergeOption {
711                    label: "Use ours (main)".to_string(),
712                },
713                MergeOption {
714                    label: "Use theirs (branch)".to_string(),
715                },
716                MergeOption {
717                    label: "Manual resolution".to_string(),
718                },
719            ],
720        })
721    } else {
722        Ok(SteeringDecision {
723            needs_input: false,
724            reason: String::new(),
725            options: vec![],
726        })
727    }
728}
729
730/// Generate an execution summary for a completed merge.
731///
732/// Describes what was merged including commit count and key changes.
733pub fn merge_execution_summary(workspace: &Path, loop_id: &str) -> Result<String, MergeQueueError> {
734    let branch_name = format!("ralph/{}", loop_id);
735
736    // Get commit count
737    let count_output = Command::new("git")
738        .args(["rev-list", "--count", &format!("main..{}", branch_name)])
739        .current_dir(workspace)
740        .output()?;
741
742    let commit_count = String::from_utf8_lossy(&count_output.stdout)
743        .trim()
744        .parse::<usize>()
745        .unwrap_or(0);
746
747    // Get file count
748    let files_output = Command::new("git")
749        .args(["diff", "--name-only", "main", &branch_name])
750        .current_dir(workspace)
751        .output()?;
752
753    let files = String::from_utf8_lossy(&files_output.stdout);
754    let file_count = files.lines().count();
755
756    // Get the most descriptive commit message
757    let log_output = Command::new("git")
758        .args(["log", "-1", "--format=%s", &branch_name])
759        .current_dir(workspace)
760        .output()?;
761
762    let last_commit = String::from_utf8_lossy(&log_output.stdout)
763        .trim()
764        .to_string();
765
766    // Build summary
767    let summary = format!(
768        "{} commit{}, {} file{} changed: {}",
769        commit_count,
770        if commit_count == 1 { "" } else { "s" },
771        file_count,
772        if file_count == 1 { "" } else { "s" },
773        last_commit
774    );
775
776    Ok(summary)
777}
778
779#[cfg(test)]
780mod tests {
781    use super::*;
782    use tempfile::TempDir;
783
784    #[test]
785    fn test_enqueue() {
786        let temp_dir = TempDir::new().unwrap();
787        let queue = MergeQueue::new(temp_dir.path());
788
789        queue.enqueue("loop-123", "implement auth").unwrap();
790
791        let entries = queue.list().unwrap();
792        assert_eq!(entries.len(), 1);
793        assert_eq!(entries[0].loop_id, "loop-123");
794        assert_eq!(entries[0].prompt, "implement auth");
795        assert_eq!(entries[0].state, MergeState::Queued);
796    }
797
798    #[test]
799    fn test_full_merge_lifecycle() {
800        let temp_dir = TempDir::new().unwrap();
801        let queue = MergeQueue::new(temp_dir.path());
802
803        // Enqueue
804        queue.enqueue("loop-abc", "test prompt").unwrap();
805        let entry = queue.get_entry("loop-abc").unwrap().unwrap();
806        assert_eq!(entry.state, MergeState::Queued);
807
808        // Start merging
809        queue.mark_merging("loop-abc", 12345).unwrap();
810        let entry = queue.get_entry("loop-abc").unwrap().unwrap();
811        assert_eq!(entry.state, MergeState::Merging);
812        assert_eq!(entry.merge_pid, Some(12345));
813
814        // Complete merge
815        queue.mark_merged("loop-abc", "commit-sha-123").unwrap();
816        let entry = queue.get_entry("loop-abc").unwrap().unwrap();
817        assert_eq!(entry.state, MergeState::Merged);
818        assert_eq!(entry.merge_commit, Some("commit-sha-123".to_string()));
819    }
820
821    #[test]
822    fn test_merge_needs_review() {
823        let temp_dir = TempDir::new().unwrap();
824        let queue = MergeQueue::new(temp_dir.path());
825
826        queue.enqueue("loop-def", "test").unwrap();
827        queue.mark_merging("loop-def", 99999).unwrap();
828        queue
829            .mark_needs_review("loop-def", "Conflicting changes in src/auth.rs")
830            .unwrap();
831
832        let entry = queue.get_entry("loop-def").unwrap().unwrap();
833        assert_eq!(entry.state, MergeState::NeedsReview);
834        assert_eq!(
835            entry.failure_reason,
836            Some("Conflicting changes in src/auth.rs".to_string())
837        );
838    }
839
840    #[test]
841    fn test_discard_from_queued() {
842        let temp_dir = TempDir::new().unwrap();
843        let queue = MergeQueue::new(temp_dir.path());
844
845        queue.enqueue("loop-xyz", "test").unwrap();
846        queue.discard("loop-xyz", Some("No longer needed")).unwrap();
847
848        let entry = queue.get_entry("loop-xyz").unwrap().unwrap();
849        assert_eq!(entry.state, MergeState::Discarded);
850        assert_eq!(entry.discard_reason, Some("No longer needed".to_string()));
851    }
852
853    #[test]
854    fn test_discard_from_needs_review() {
855        let temp_dir = TempDir::new().unwrap();
856        let queue = MergeQueue::new(temp_dir.path());
857
858        queue.enqueue("loop-xyz", "test").unwrap();
859        queue.mark_merging("loop-xyz", 123).unwrap();
860        queue.mark_needs_review("loop-xyz", "conflicts").unwrap();
861        queue.discard("loop-xyz", None).unwrap();
862
863        let entry = queue.get_entry("loop-xyz").unwrap().unwrap();
864        assert_eq!(entry.state, MergeState::Discarded);
865    }
866
867    #[test]
868    fn test_next_pending_fifo() {
869        let temp_dir = TempDir::new().unwrap();
870        let queue = MergeQueue::new(temp_dir.path());
871
872        queue.enqueue("loop-1", "first").unwrap();
873        std::thread::sleep(std::time::Duration::from_millis(10));
874        queue.enqueue("loop-2", "second").unwrap();
875        std::thread::sleep(std::time::Duration::from_millis(10));
876        queue.enqueue("loop-3", "third").unwrap();
877
878        // First pending should be loop-1
879        let pending = queue.next_pending().unwrap().unwrap();
880        assert_eq!(pending.loop_id, "loop-1");
881
882        // Mark loop-1 as merging
883        queue.mark_merging("loop-1", 123).unwrap();
884
885        // Next pending should be loop-2
886        let pending = queue.next_pending().unwrap().unwrap();
887        assert_eq!(pending.loop_id, "loop-2");
888    }
889
890    #[test]
891    fn test_invalid_transition_queued_to_merged() {
892        let temp_dir = TempDir::new().unwrap();
893        let queue = MergeQueue::new(temp_dir.path());
894
895        queue.enqueue("loop-xyz", "test").unwrap();
896
897        // Can't go directly from queued to merged
898        let result = queue.mark_merged("loop-xyz", "commit");
899        assert!(matches!(
900            result,
901            Err(MergeQueueError::InvalidTransition(
902                _,
903                MergeState::Queued,
904                MergeState::Merged
905            ))
906        ));
907    }
908
909    #[test]
910    fn test_invalid_transition_merged_to_needs_review() {
911        let temp_dir = TempDir::new().unwrap();
912        let queue = MergeQueue::new(temp_dir.path());
913
914        queue.enqueue("loop-xyz", "test").unwrap();
915        queue.mark_merging("loop-xyz", 123).unwrap();
916        queue.mark_merged("loop-xyz", "abc").unwrap();
917
918        // Can't go from merged to needs_review
919        let result = queue.mark_needs_review("loop-xyz", "error");
920        assert!(matches!(
921            result,
922            Err(MergeQueueError::InvalidTransition(
923                _,
924                MergeState::Merged,
925                MergeState::NeedsReview
926            ))
927        ));
928    }
929
930    #[test]
931    fn test_not_found() {
932        let temp_dir = TempDir::new().unwrap();
933        let queue = MergeQueue::new(temp_dir.path());
934
935        let result = queue.mark_merging("nonexistent", 123);
936        assert!(matches!(result, Err(MergeQueueError::NotFound(_))));
937    }
938
939    #[test]
940    fn test_retry_from_needs_review() {
941        let temp_dir = TempDir::new().unwrap();
942        let queue = MergeQueue::new(temp_dir.path());
943
944        queue.enqueue("loop-retry", "test").unwrap();
945        queue.mark_merging("loop-retry", 100).unwrap();
946        queue.mark_needs_review("loop-retry", "conflicts").unwrap();
947
948        // Can retry (mark_merging) from needs_review
949        queue.mark_merging("loop-retry", 200).unwrap();
950        let entry = queue.get_entry("loop-retry").unwrap().unwrap();
951        assert_eq!(entry.state, MergeState::Merging);
952        assert_eq!(entry.merge_pid, Some(200));
953    }
954
955    #[test]
956    fn test_list_by_state() {
957        let temp_dir = TempDir::new().unwrap();
958        let queue = MergeQueue::new(temp_dir.path());
959
960        queue.enqueue("loop-1", "test 1").unwrap();
961        queue.enqueue("loop-2", "test 2").unwrap();
962        queue.enqueue("loop-3", "test 3").unwrap();
963
964        queue.mark_merging("loop-1", 123).unwrap();
965        queue.mark_merged("loop-1", "abc").unwrap();
966
967        queue.mark_merging("loop-2", 456).unwrap();
968
969        let queued = queue.list_by_state(MergeState::Queued).unwrap();
970        assert_eq!(queued.len(), 1);
971        assert_eq!(queued[0].loop_id, "loop-3");
972
973        let merging = queue.list_by_state(MergeState::Merging).unwrap();
974        assert_eq!(merging.len(), 1);
975        assert_eq!(merging[0].loop_id, "loop-2");
976
977        let merged = queue.list_by_state(MergeState::Merged).unwrap();
978        assert_eq!(merged.len(), 1);
979        assert_eq!(merged[0].loop_id, "loop-1");
980    }
981
982    #[test]
983    fn test_empty_queue() {
984        let temp_dir = TempDir::new().unwrap();
985        let queue = MergeQueue::new(temp_dir.path());
986
987        let entries = queue.list().unwrap();
988        assert!(entries.is_empty());
989
990        let pending = queue.next_pending().unwrap();
991        assert!(pending.is_none());
992    }
993
994    #[test]
995    fn test_persistence() {
996        let temp_dir = TempDir::new().unwrap();
997
998        {
999            let queue = MergeQueue::new(temp_dir.path());
1000            queue.enqueue("loop-persist", "test persistence").unwrap();
1001        }
1002
1003        // Load again and verify data persisted
1004        {
1005            let queue = MergeQueue::new(temp_dir.path());
1006            let entries = queue.list().unwrap();
1007            assert_eq!(entries.len(), 1);
1008            assert_eq!(entries[0].loop_id, "loop-persist");
1009            assert_eq!(entries[0].prompt, "test persistence");
1010        }
1011    }
1012
1013    #[test]
1014    fn test_event_serialization() {
1015        let event = MergeEvent {
1016            ts: Utc::now(),
1017            loop_id: "loop-test".to_string(),
1018            event: MergeEventType::Queued {
1019                prompt: "test prompt".to_string(),
1020            },
1021        };
1022
1023        let json = serde_json::to_string(&event).unwrap();
1024        let parsed: MergeEvent = serde_json::from_str(&json).unwrap();
1025
1026        assert_eq!(parsed.loop_id, event.loop_id);
1027        match parsed.event {
1028            MergeEventType::Queued { prompt } => assert_eq!(prompt, "test prompt"),
1029            _ => panic!("Wrong event type"),
1030        }
1031    }
1032
1033    #[test]
1034    fn test_creates_ralph_directory() {
1035        let temp_dir = TempDir::new().unwrap();
1036        let ralph_dir = temp_dir.path().join(".ralph");
1037        let queue_file = ralph_dir.join("merge-queue.jsonl");
1038
1039        assert!(!ralph_dir.exists());
1040        assert!(!queue_file.exists());
1041
1042        let queue = MergeQueue::new(temp_dir.path());
1043        queue.enqueue("loop-dir", "test").unwrap();
1044
1045        assert!(ralph_dir.exists());
1046        assert!(queue_file.exists());
1047    }
1048}