1use crate::loop_lock::LoopLock;
40use chrono::{DateTime, Utc};
41use serde::{Deserialize, Serialize};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, BufRead, BufReader, Seek, SeekFrom, Write};
44use std::path::{Path, PathBuf};
45use std::process::Command;
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
49pub struct MergeEvent {
50 pub ts: DateTime<Utc>,
52
53 pub loop_id: String,
55
56 pub event: MergeEventType,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum MergeEventType {
64 Queued {
66 prompt: String,
68 },
69
70 Merging {
72 pid: u32,
74 },
75
76 Merged {
78 commit: String,
80 },
81
82 NeedsReview {
84 reason: String,
86 },
87
88 Discarded {
90 reason: Option<String>,
92 },
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum MergeButtonState {
98 Active,
100 Blocked { reason: String },
102}
103
104#[derive(Debug, Clone)]
106pub struct SteeringDecision {
107 pub needs_input: bool,
109 pub reason: String,
111 pub options: Vec<MergeOption>,
113}
114
115#[derive(Debug, Clone)]
117pub struct MergeOption {
118 pub label: String,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum MergeState {
125 Queued,
127 Merging,
129 Merged,
131 NeedsReview,
133 Discarded,
135}
136
137impl MergeState {
138 pub fn is_terminal(self) -> bool {
143 matches!(self, Self::Merged | Self::Discarded)
144 }
145}
146
147#[derive(Debug, Clone)]
149pub struct MergeEntry {
150 pub loop_id: String,
152
153 pub prompt: String,
155
156 pub state: MergeState,
158
159 pub queued_at: DateTime<Utc>,
161
162 pub merge_pid: Option<u32>,
164
165 pub merge_commit: Option<String>,
167
168 pub failure_reason: Option<String>,
170
171 pub discard_reason: Option<String>,
173}
174
175#[derive(Debug, thiserror::Error)]
177pub enum MergeQueueError {
178 #[error("IO error: {0}")]
180 Io(#[from] io::Error),
181
182 #[error("Failed to parse merge queue: {0}")]
184 ParseError(String),
185
186 #[error("Loop not found in queue: {0}")]
188 NotFound(String),
189
190 #[error("Invalid state transition for {0}: cannot transition from {1:?} to {2:?}")]
192 InvalidTransition(String, MergeState, MergeState),
193
194 #[error("File locking not supported on this platform")]
196 UnsupportedPlatform,
197}
198
199pub struct MergeQueue {
204 queue_path: PathBuf,
206}
207
208impl MergeQueue {
209 pub const QUEUE_FILE: &'static str = ".ralph/merge-queue.jsonl";
211
212 pub fn new(workspace_root: impl AsRef<Path>) -> Self {
214 Self {
215 queue_path: workspace_root.as_ref().join(Self::QUEUE_FILE),
216 }
217 }
218
219 pub fn enqueue(&self, loop_id: &str, prompt: &str) -> Result<(), MergeQueueError> {
226 let event = MergeEvent {
227 ts: Utc::now(),
228 loop_id: loop_id.to_string(),
229 event: MergeEventType::Queued {
230 prompt: prompt.to_string(),
231 },
232 };
233 self.append_event(&event)
234 }
235
236 pub fn mark_merging(&self, loop_id: &str, pid: u32) -> Result<(), MergeQueueError> {
243 let entry = self.get_entry(loop_id)?;
245 match entry {
246 Some(e) if e.state == MergeState::Queued || e.state == MergeState::NeedsReview => {}
247 Some(e) => {
248 return Err(MergeQueueError::InvalidTransition(
249 loop_id.to_string(),
250 e.state,
251 MergeState::Merging,
252 ));
253 }
254 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
255 }
256
257 let event = MergeEvent {
258 ts: Utc::now(),
259 loop_id: loop_id.to_string(),
260 event: MergeEventType::Merging { pid },
261 };
262 self.append_event(&event)
263 }
264
265 pub fn mark_merged(&self, loop_id: &str, commit: &str) -> Result<(), MergeQueueError> {
272 let entry = self.get_entry(loop_id)?;
274 match entry {
275 Some(e) if e.state == MergeState::Merging => {}
276 Some(e) => {
277 return Err(MergeQueueError::InvalidTransition(
278 loop_id.to_string(),
279 e.state,
280 MergeState::Merged,
281 ));
282 }
283 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
284 }
285
286 let event = MergeEvent {
287 ts: Utc::now(),
288 loop_id: loop_id.to_string(),
289 event: MergeEventType::Merged {
290 commit: commit.to_string(),
291 },
292 };
293 self.append_event(&event)
294 }
295
296 pub fn mark_needs_review(&self, loop_id: &str, reason: &str) -> Result<(), MergeQueueError> {
303 let entry = self.get_entry(loop_id)?;
305 match entry {
306 Some(e) if e.state == MergeState::Merging => {}
307 Some(e) => {
308 return Err(MergeQueueError::InvalidTransition(
309 loop_id.to_string(),
310 e.state,
311 MergeState::NeedsReview,
312 ));
313 }
314 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
315 }
316
317 let event = MergeEvent {
318 ts: Utc::now(),
319 loop_id: loop_id.to_string(),
320 event: MergeEventType::NeedsReview {
321 reason: reason.to_string(),
322 },
323 };
324 self.append_event(&event)
325 }
326
327 pub fn discard(&self, loop_id: &str, reason: Option<&str>) -> Result<(), MergeQueueError> {
334 let entry = self.get_entry(loop_id)?;
336 match entry {
337 Some(e) if e.state == MergeState::Queued || e.state == MergeState::NeedsReview => {}
338 Some(e) => {
339 return Err(MergeQueueError::InvalidTransition(
340 loop_id.to_string(),
341 e.state,
342 MergeState::Discarded,
343 ));
344 }
345 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
346 }
347
348 let event = MergeEvent {
349 ts: Utc::now(),
350 loop_id: loop_id.to_string(),
351 event: MergeEventType::Discarded {
352 reason: reason.map(String::from),
353 },
354 };
355 self.append_event(&event)
356 }
357
358 pub fn next_pending(&self) -> Result<Option<MergeEntry>, MergeQueueError> {
362 let entries = self.list()?;
363 Ok(entries.into_iter().find(|e| e.state == MergeState::Queued))
364 }
365
366 pub fn get_entry(&self, loop_id: &str) -> Result<Option<MergeEntry>, MergeQueueError> {
368 let entries = self.list()?;
369 Ok(entries.into_iter().find(|e| e.loop_id == loop_id))
370 }
371
372 pub fn list(&self) -> Result<Vec<MergeEntry>, MergeQueueError> {
376 let events = self.read_all_events()?;
377 Ok(Self::derive_state(&events))
378 }
379
380 pub fn list_by_state(&self, state: MergeState) -> Result<Vec<MergeEntry>, MergeQueueError> {
382 let entries = self.list()?;
383 Ok(entries.into_iter().filter(|e| e.state == state).collect())
384 }
385
386 fn read_all_events(&self) -> Result<Vec<MergeEvent>, MergeQueueError> {
388 if !self.queue_path.exists() {
389 return Ok(Vec::new());
390 }
391
392 self.with_shared_lock(|file| {
393 let reader = BufReader::new(file);
394 let mut events = Vec::new();
395
396 for (line_num, line) in reader.lines().enumerate() {
397 let line = line?;
398 if line.trim().is_empty() {
399 continue;
400 }
401
402 let event: MergeEvent = serde_json::from_str(&line).map_err(|e| {
403 MergeQueueError::ParseError(format!("Line {}: {}", line_num + 1, e))
404 })?;
405 events.push(event);
406 }
407
408 Ok(events)
409 })
410 }
411
412 fn derive_state(events: &[MergeEvent]) -> Vec<MergeEntry> {
414 use std::collections::HashMap;
415
416 let mut loop_states: HashMap<String, MergeEntry> = HashMap::new();
418
419 for event in events {
420 let entry = loop_states
421 .entry(event.loop_id.clone())
422 .or_insert_with(|| MergeEntry {
423 loop_id: event.loop_id.clone(),
424 prompt: String::new(),
425 state: MergeState::Queued,
426 queued_at: event.ts,
427 merge_pid: None,
428 merge_commit: None,
429 failure_reason: None,
430 discard_reason: None,
431 });
432
433 match &event.event {
434 MergeEventType::Queued { prompt } => {
435 entry.prompt = prompt.clone();
436 entry.state = MergeState::Queued;
437 entry.queued_at = event.ts;
438 }
439 MergeEventType::Merging { pid } => {
440 entry.state = MergeState::Merging;
441 entry.merge_pid = Some(*pid);
442 }
443 MergeEventType::Merged { commit } => {
444 entry.state = MergeState::Merged;
445 entry.merge_commit = Some(commit.clone());
446 }
447 MergeEventType::NeedsReview { reason } => {
448 entry.state = MergeState::NeedsReview;
449 entry.failure_reason = Some(reason.clone());
450 }
451 MergeEventType::Discarded { reason } => {
452 entry.state = MergeState::Discarded;
453 entry.discard_reason = reason.clone();
454 }
455 }
456 }
457
458 let mut entries: Vec<_> = loop_states.into_values().collect();
460 entries.sort_by(|a, b| a.queued_at.cmp(&b.queued_at));
461 entries
462 }
463
464 fn append_event(&self, event: &MergeEvent) -> Result<(), MergeQueueError> {
466 self.with_exclusive_lock(|mut file| {
467 file.seek(SeekFrom::End(0))?;
469
470 let json = serde_json::to_string(event)
472 .map_err(|e| MergeQueueError::ParseError(e.to_string()))?;
473 writeln!(file, "{}", json)?;
474
475 file.sync_all()?;
476 Ok(())
477 })
478 }
479
480 #[cfg(unix)]
482 fn with_shared_lock<T, F>(&self, f: F) -> Result<T, MergeQueueError>
483 where
484 F: FnOnce(&File) -> Result<T, MergeQueueError>,
485 {
486 use nix::fcntl::{Flock, FlockArg};
487
488 let file = File::open(&self.queue_path)?;
489
490 let flock = Flock::lock(file, FlockArg::LockShared).map_err(|(_, errno)| {
492 MergeQueueError::Io(io::Error::new(
493 io::ErrorKind::Other,
494 format!("flock failed: {}", errno),
495 ))
496 })?;
497
498 use std::os::fd::AsFd;
500 let borrowed_fd = flock.as_fd();
501 let owned_fd = borrowed_fd.try_clone_to_owned()?;
502 let file: File = owned_fd.into();
503
504 f(&file)
505 }
506
507 #[cfg(not(unix))]
508 fn with_shared_lock<T, F>(&self, _f: F) -> Result<T, MergeQueueError>
509 where
510 F: FnOnce(&File) -> Result<T, MergeQueueError>,
511 {
512 Err(MergeQueueError::UnsupportedPlatform)
513 }
514
515 #[cfg(unix)]
517 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T, MergeQueueError>
518 where
519 F: FnOnce(File) -> Result<T, MergeQueueError>,
520 {
521 use nix::fcntl::{Flock, FlockArg};
522
523 if let Some(parent) = self.queue_path.parent() {
525 fs::create_dir_all(parent)?;
526 }
527
528 let file = OpenOptions::new()
530 .read(true)
531 .write(true)
532 .create(true)
533 .truncate(false)
534 .open(&self.queue_path)?;
535
536 let flock = Flock::lock(file, FlockArg::LockExclusive).map_err(|(_, errno)| {
538 MergeQueueError::Io(io::Error::new(
539 io::ErrorKind::Other,
540 format!("flock failed: {}", errno),
541 ))
542 })?;
543
544 use std::os::fd::AsFd;
546 let borrowed_fd = flock.as_fd();
547 let owned_fd = borrowed_fd.try_clone_to_owned()?;
548 let file: File = owned_fd.into();
549
550 f(file)
551 }
552
553 #[cfg(not(unix))]
554 fn with_exclusive_lock<T, F>(&self, _f: F) -> Result<T, MergeQueueError>
555 where
556 F: FnOnce(File) -> Result<T, MergeQueueError>,
557 {
558 Err(MergeQueueError::UnsupportedPlatform)
559 }
560}
561
562pub fn merge_button_state(
568 workspace: &Path,
569 loop_id: &str,
570) -> Result<MergeButtonState, MergeQueueError> {
571 let queue = MergeQueue::new(workspace);
572
573 if let Some(entry) = queue.get_entry(loop_id)?
575 && entry.state == MergeState::Merging
576 {
577 return Ok(MergeButtonState::Blocked {
578 reason: "Merge already in progress".to_string(),
579 });
580 }
581
582 if let Ok(Some(metadata)) = LoopLock::read_existing(workspace) {
586 if is_pid_alive(metadata.pid) {
588 return Ok(MergeButtonState::Blocked {
589 reason: format!("primary loop running: {}", metadata.prompt),
590 });
591 }
592 }
593
594 Ok(MergeButtonState::Active)
595}
596
597fn is_pid_alive(pid: u32) -> bool {
599 #[cfg(unix)]
600 {
601 use nix::sys::signal::kill;
602 use nix::unistd::Pid;
603 kill(Pid::from_raw(pid as i32), None).is_ok()
605 }
606
607 #[cfg(not(unix))]
608 {
609 true
611 }
612}
613
614pub fn smart_merge_summary(workspace: &Path, loop_id: &str) -> Result<String, MergeQueueError> {
620 let branch_name = format!("ralph/{}", loop_id);
621
622 let output = Command::new("git")
624 .args([
625 "log",
626 "--oneline",
627 "--no-walk=unsorted",
628 &format!("main..{}", branch_name),
629 ])
630 .current_dir(workspace)
631 .output()?;
632
633 let log_output = String::from_utf8_lossy(&output.stdout);
634 let lines: Vec<&str> = log_output.lines().collect();
635
636 let summary = if lines.is_empty() {
638 let output = Command::new("git")
640 .args(["log", "-1", "--oneline", &branch_name])
641 .current_dir(workspace)
642 .output()?;
643
644 let msg = String::from_utf8_lossy(&output.stdout);
645 extract_summary_from_line(msg.trim())
646 } else {
647 extract_summary_from_line(lines[0])
649 };
650
651 let prefix_len = 14; let suffix_len = 8 + loop_id.len(); let max_summary_len = 72 - prefix_len - suffix_len;
655
656 let summary = if summary.len() > max_summary_len {
658 format!("{}...", &summary[..max_summary_len.saturating_sub(3)])
659 } else {
660 summary
661 };
662
663 Ok(summary)
664}
665
666fn extract_summary_from_line(line: &str) -> String {
668 if let Some(idx) = line.find(' ') {
670 line[idx + 1..].to_string()
671 } else {
672 line.to_string()
673 }
674}
675
676pub fn merge_needs_steering(
678 workspace: &Path,
679 loop_id: &str,
680) -> Result<SteeringDecision, MergeQueueError> {
681 let branch_name = format!("ralph/{}", loop_id);
682
683 let output = Command::new("git")
685 .args(["merge-tree", "--write-tree", "main", &branch_name])
686 .current_dir(workspace)
687 .output()?;
688
689 let has_conflicts =
691 !output.status.success() || String::from_utf8_lossy(&output.stdout).contains("CONFLICT");
692
693 if has_conflicts {
694 let diff_output = Command::new("git")
696 .args(["diff", "--name-only", "main", &branch_name])
697 .current_dir(workspace)
698 .output()?;
699
700 let files = String::from_utf8_lossy(&diff_output.stdout);
701 let file_list: Vec<&str> = files.lines().take(3).collect();
702
703 let reason = if file_list.is_empty() {
704 "Potential conflict detected".to_string()
705 } else {
706 format!("Files modified on both branches: {}", file_list.join(", "))
707 };
708
709 Ok(SteeringDecision {
710 needs_input: true,
711 reason,
712 options: vec![
713 MergeOption {
714 label: "Use ours (main)".to_string(),
715 },
716 MergeOption {
717 label: "Use theirs (branch)".to_string(),
718 },
719 MergeOption {
720 label: "Manual resolution".to_string(),
721 },
722 ],
723 })
724 } else {
725 Ok(SteeringDecision {
726 needs_input: false,
727 reason: String::new(),
728 options: vec![],
729 })
730 }
731}
732
733pub fn merge_execution_summary(workspace: &Path, loop_id: &str) -> Result<String, MergeQueueError> {
737 let branch_name = format!("ralph/{}", loop_id);
738
739 let count_output = Command::new("git")
741 .args(["rev-list", "--count", &format!("main..{}", branch_name)])
742 .current_dir(workspace)
743 .output()?;
744
745 let commit_count = String::from_utf8_lossy(&count_output.stdout)
746 .trim()
747 .parse::<usize>()
748 .unwrap_or(0);
749
750 let files_output = Command::new("git")
752 .args(["diff", "--name-only", "main", &branch_name])
753 .current_dir(workspace)
754 .output()?;
755
756 let files = String::from_utf8_lossy(&files_output.stdout);
757 let file_count = files.lines().count();
758
759 let log_output = Command::new("git")
761 .args(["log", "-1", "--format=%s", &branch_name])
762 .current_dir(workspace)
763 .output()?;
764
765 let last_commit = String::from_utf8_lossy(&log_output.stdout)
766 .trim()
767 .to_string();
768
769 let summary = format!(
771 "{} commit{}, {} file{} changed: {}",
772 commit_count,
773 if commit_count == 1 { "" } else { "s" },
774 file_count,
775 if file_count == 1 { "" } else { "s" },
776 last_commit
777 );
778
779 Ok(summary)
780}
781
782#[cfg(test)]
783mod tests {
784 use super::*;
785 use tempfile::TempDir;
786
787 #[test]
788 fn test_enqueue() {
789 let temp_dir = TempDir::new().unwrap();
790 let queue = MergeQueue::new(temp_dir.path());
791
792 queue.enqueue("loop-123", "implement auth").unwrap();
793
794 let entries = queue.list().unwrap();
795 assert_eq!(entries.len(), 1);
796 assert_eq!(entries[0].loop_id, "loop-123");
797 assert_eq!(entries[0].prompt, "implement auth");
798 assert_eq!(entries[0].state, MergeState::Queued);
799 }
800
801 #[test]
802 fn test_full_merge_lifecycle() {
803 let temp_dir = TempDir::new().unwrap();
804 let queue = MergeQueue::new(temp_dir.path());
805
806 queue.enqueue("loop-abc", "test prompt").unwrap();
808 let entry = queue.get_entry("loop-abc").unwrap().unwrap();
809 assert_eq!(entry.state, MergeState::Queued);
810
811 queue.mark_merging("loop-abc", 12345).unwrap();
813 let entry = queue.get_entry("loop-abc").unwrap().unwrap();
814 assert_eq!(entry.state, MergeState::Merging);
815 assert_eq!(entry.merge_pid, Some(12345));
816
817 queue.mark_merged("loop-abc", "commit-sha-123").unwrap();
819 let entry = queue.get_entry("loop-abc").unwrap().unwrap();
820 assert_eq!(entry.state, MergeState::Merged);
821 assert_eq!(entry.merge_commit, Some("commit-sha-123".to_string()));
822 }
823
824 #[test]
825 fn test_merge_needs_review() {
826 let temp_dir = TempDir::new().unwrap();
827 let queue = MergeQueue::new(temp_dir.path());
828
829 queue.enqueue("loop-def", "test").unwrap();
830 queue.mark_merging("loop-def", 99999).unwrap();
831 queue
832 .mark_needs_review("loop-def", "Conflicting changes in src/auth.rs")
833 .unwrap();
834
835 let entry = queue.get_entry("loop-def").unwrap().unwrap();
836 assert_eq!(entry.state, MergeState::NeedsReview);
837 assert_eq!(
838 entry.failure_reason,
839 Some("Conflicting changes in src/auth.rs".to_string())
840 );
841 }
842
843 #[test]
844 fn test_discard_from_queued() {
845 let temp_dir = TempDir::new().unwrap();
846 let queue = MergeQueue::new(temp_dir.path());
847
848 queue.enqueue("loop-xyz", "test").unwrap();
849 queue.discard("loop-xyz", Some("No longer needed")).unwrap();
850
851 let entry = queue.get_entry("loop-xyz").unwrap().unwrap();
852 assert_eq!(entry.state, MergeState::Discarded);
853 assert_eq!(entry.discard_reason, Some("No longer needed".to_string()));
854 }
855
856 #[test]
857 fn test_discard_from_needs_review() {
858 let temp_dir = TempDir::new().unwrap();
859 let queue = MergeQueue::new(temp_dir.path());
860
861 queue.enqueue("loop-xyz", "test").unwrap();
862 queue.mark_merging("loop-xyz", 123).unwrap();
863 queue.mark_needs_review("loop-xyz", "conflicts").unwrap();
864 queue.discard("loop-xyz", None).unwrap();
865
866 let entry = queue.get_entry("loop-xyz").unwrap().unwrap();
867 assert_eq!(entry.state, MergeState::Discarded);
868 }
869
870 #[test]
871 fn test_next_pending_fifo() {
872 let temp_dir = TempDir::new().unwrap();
873 let queue = MergeQueue::new(temp_dir.path());
874
875 queue.enqueue("loop-1", "first").unwrap();
876 std::thread::sleep(std::time::Duration::from_millis(10));
877 queue.enqueue("loop-2", "second").unwrap();
878 std::thread::sleep(std::time::Duration::from_millis(10));
879 queue.enqueue("loop-3", "third").unwrap();
880
881 let pending = queue.next_pending().unwrap().unwrap();
883 assert_eq!(pending.loop_id, "loop-1");
884
885 queue.mark_merging("loop-1", 123).unwrap();
887
888 let pending = queue.next_pending().unwrap().unwrap();
890 assert_eq!(pending.loop_id, "loop-2");
891 }
892
893 #[test]
894 fn test_invalid_transition_queued_to_merged() {
895 let temp_dir = TempDir::new().unwrap();
896 let queue = MergeQueue::new(temp_dir.path());
897
898 queue.enqueue("loop-xyz", "test").unwrap();
899
900 let result = queue.mark_merged("loop-xyz", "commit");
902 assert!(matches!(
903 result,
904 Err(MergeQueueError::InvalidTransition(
905 _,
906 MergeState::Queued,
907 MergeState::Merged
908 ))
909 ));
910 }
911
912 #[test]
913 fn test_invalid_transition_merged_to_needs_review() {
914 let temp_dir = TempDir::new().unwrap();
915 let queue = MergeQueue::new(temp_dir.path());
916
917 queue.enqueue("loop-xyz", "test").unwrap();
918 queue.mark_merging("loop-xyz", 123).unwrap();
919 queue.mark_merged("loop-xyz", "abc").unwrap();
920
921 let result = queue.mark_needs_review("loop-xyz", "error");
923 assert!(matches!(
924 result,
925 Err(MergeQueueError::InvalidTransition(
926 _,
927 MergeState::Merged,
928 MergeState::NeedsReview
929 ))
930 ));
931 }
932
933 #[test]
934 fn test_not_found() {
935 let temp_dir = TempDir::new().unwrap();
936 let queue = MergeQueue::new(temp_dir.path());
937
938 let result = queue.mark_merging("nonexistent", 123);
939 assert!(matches!(result, Err(MergeQueueError::NotFound(_))));
940 }
941
942 #[test]
943 fn test_retry_from_needs_review() {
944 let temp_dir = TempDir::new().unwrap();
945 let queue = MergeQueue::new(temp_dir.path());
946
947 queue.enqueue("loop-retry", "test").unwrap();
948 queue.mark_merging("loop-retry", 100).unwrap();
949 queue.mark_needs_review("loop-retry", "conflicts").unwrap();
950
951 queue.mark_merging("loop-retry", 200).unwrap();
953 let entry = queue.get_entry("loop-retry").unwrap().unwrap();
954 assert_eq!(entry.state, MergeState::Merging);
955 assert_eq!(entry.merge_pid, Some(200));
956 }
957
958 #[test]
959 fn test_list_by_state() {
960 let temp_dir = TempDir::new().unwrap();
961 let queue = MergeQueue::new(temp_dir.path());
962
963 queue.enqueue("loop-1", "test 1").unwrap();
964 queue.enqueue("loop-2", "test 2").unwrap();
965 queue.enqueue("loop-3", "test 3").unwrap();
966
967 queue.mark_merging("loop-1", 123).unwrap();
968 queue.mark_merged("loop-1", "abc").unwrap();
969
970 queue.mark_merging("loop-2", 456).unwrap();
971
972 let queued = queue.list_by_state(MergeState::Queued).unwrap();
973 assert_eq!(queued.len(), 1);
974 assert_eq!(queued[0].loop_id, "loop-3");
975
976 let merging = queue.list_by_state(MergeState::Merging).unwrap();
977 assert_eq!(merging.len(), 1);
978 assert_eq!(merging[0].loop_id, "loop-2");
979
980 let merged = queue.list_by_state(MergeState::Merged).unwrap();
981 assert_eq!(merged.len(), 1);
982 assert_eq!(merged[0].loop_id, "loop-1");
983 }
984
985 #[test]
986 fn test_empty_queue() {
987 let temp_dir = TempDir::new().unwrap();
988 let queue = MergeQueue::new(temp_dir.path());
989
990 let entries = queue.list().unwrap();
991 assert!(entries.is_empty());
992
993 let pending = queue.next_pending().unwrap();
994 assert!(pending.is_none());
995 }
996
997 #[test]
998 fn test_persistence() {
999 let temp_dir = TempDir::new().unwrap();
1000
1001 {
1002 let queue = MergeQueue::new(temp_dir.path());
1003 queue.enqueue("loop-persist", "test persistence").unwrap();
1004 }
1005
1006 {
1008 let queue = MergeQueue::new(temp_dir.path());
1009 let entries = queue.list().unwrap();
1010 assert_eq!(entries.len(), 1);
1011 assert_eq!(entries[0].loop_id, "loop-persist");
1012 assert_eq!(entries[0].prompt, "test persistence");
1013 }
1014 }
1015
1016 #[test]
1017 fn test_event_serialization() {
1018 let event = MergeEvent {
1019 ts: Utc::now(),
1020 loop_id: "loop-test".to_string(),
1021 event: MergeEventType::Queued {
1022 prompt: "test prompt".to_string(),
1023 },
1024 };
1025
1026 let json = serde_json::to_string(&event).unwrap();
1027 let parsed: MergeEvent = serde_json::from_str(&json).unwrap();
1028
1029 assert_eq!(parsed.loop_id, event.loop_id);
1030 match parsed.event {
1031 MergeEventType::Queued { prompt } => assert_eq!(prompt, "test prompt"),
1032 _ => panic!("Wrong event type"),
1033 }
1034 }
1035
1036 #[test]
1037 fn test_creates_ralph_directory() {
1038 let temp_dir = TempDir::new().unwrap();
1039 let ralph_dir = temp_dir.path().join(".ralph");
1040 let queue_file = ralph_dir.join("merge-queue.jsonl");
1041
1042 assert!(!ralph_dir.exists());
1043 assert!(!queue_file.exists());
1044
1045 let queue = MergeQueue::new(temp_dir.path());
1046 queue.enqueue("loop-dir", "test").unwrap();
1047
1048 assert!(ralph_dir.exists());
1049 assert!(queue_file.exists());
1050 }
1051}