1use crate::loop_lock::LoopLock;
40use crate::text::truncate_with_ellipsis;
41use chrono::{DateTime, Utc};
42use serde::{Deserialize, Serialize};
43use std::fs::{self, File, OpenOptions};
44use std::io::{self, BufRead, BufReader, Seek, SeekFrom, Write};
45use std::path::{Path, PathBuf};
46use std::process::Command;
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct MergeEvent {
51 pub ts: DateTime<Utc>,
53
54 pub loop_id: String,
56
57 pub event: MergeEventType,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(tag = "type", rename_all = "snake_case")]
64pub enum MergeEventType {
65 Queued {
67 prompt: String,
69 },
70
71 Merging {
73 pid: u32,
75 },
76
77 Merged {
79 commit: String,
81 },
82
83 NeedsReview {
85 reason: String,
87 },
88
89 Discarded {
91 reason: Option<String>,
93 },
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum MergeButtonState {
99 Active,
101 Blocked { reason: String },
103}
104
105#[derive(Debug, Clone)]
107pub struct SteeringDecision {
108 pub needs_input: bool,
110 pub reason: String,
112 pub options: Vec<MergeOption>,
114}
115
116#[derive(Debug, Clone)]
118pub struct MergeOption {
119 pub label: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum MergeState {
126 Queued,
128 Merging,
130 Merged,
132 NeedsReview,
134 Discarded,
136}
137
138impl MergeState {
139 pub fn is_terminal(self) -> bool {
144 matches!(self, Self::Merged | Self::Discarded)
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct MergeEntry {
151 pub loop_id: String,
153
154 pub prompt: String,
156
157 pub state: MergeState,
159
160 pub queued_at: DateTime<Utc>,
162
163 pub merge_pid: Option<u32>,
165
166 pub merge_commit: Option<String>,
168
169 pub failure_reason: Option<String>,
171
172 pub discard_reason: Option<String>,
174}
175
176#[derive(Debug, thiserror::Error)]
178pub enum MergeQueueError {
179 #[error("IO error: {0}")]
181 Io(#[from] io::Error),
182
183 #[error("Failed to parse merge queue: {0}")]
185 ParseError(String),
186
187 #[error("Loop not found in queue: {0}")]
189 NotFound(String),
190
191 #[error("Invalid state transition for {0}: cannot transition from {1:?} to {2:?}")]
193 InvalidTransition(String, MergeState, MergeState),
194
195 #[error("File locking not supported on this platform")]
197 UnsupportedPlatform,
198}
199
200pub struct MergeQueue {
205 queue_path: PathBuf,
207}
208
209impl MergeQueue {
210 pub const QUEUE_FILE: &'static str = ".ralph/merge-queue.jsonl";
212
213 pub fn new(workspace_root: impl AsRef<Path>) -> Self {
215 Self {
216 queue_path: workspace_root.as_ref().join(Self::QUEUE_FILE),
217 }
218 }
219
220 pub fn enqueue(&self, loop_id: &str, prompt: &str) -> Result<(), MergeQueueError> {
227 let event = MergeEvent {
228 ts: Utc::now(),
229 loop_id: loop_id.to_string(),
230 event: MergeEventType::Queued {
231 prompt: prompt.to_string(),
232 },
233 };
234 self.append_event(&event)
235 }
236
237 pub fn mark_merging(&self, loop_id: &str, pid: u32) -> Result<(), MergeQueueError> {
244 let entry = self.get_entry(loop_id)?;
246 match entry {
247 Some(e) if e.state == MergeState::Queued || e.state == MergeState::NeedsReview => {}
248 Some(e) => {
249 return Err(MergeQueueError::InvalidTransition(
250 loop_id.to_string(),
251 e.state,
252 MergeState::Merging,
253 ));
254 }
255 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
256 }
257
258 let event = MergeEvent {
259 ts: Utc::now(),
260 loop_id: loop_id.to_string(),
261 event: MergeEventType::Merging { pid },
262 };
263 self.append_event(&event)
264 }
265
266 pub fn mark_merged(&self, loop_id: &str, commit: &str) -> Result<(), MergeQueueError> {
273 let entry = self.get_entry(loop_id)?;
275 match entry {
276 Some(e) if e.state == MergeState::Merging => {}
277 Some(e) => {
278 return Err(MergeQueueError::InvalidTransition(
279 loop_id.to_string(),
280 e.state,
281 MergeState::Merged,
282 ));
283 }
284 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
285 }
286
287 let event = MergeEvent {
288 ts: Utc::now(),
289 loop_id: loop_id.to_string(),
290 event: MergeEventType::Merged {
291 commit: commit.to_string(),
292 },
293 };
294 self.append_event(&event)
295 }
296
297 pub fn mark_needs_review(&self, loop_id: &str, reason: &str) -> Result<(), MergeQueueError> {
304 let entry = self.get_entry(loop_id)?;
306 match entry {
307 Some(e) if e.state == MergeState::Merging => {}
308 Some(e) => {
309 return Err(MergeQueueError::InvalidTransition(
310 loop_id.to_string(),
311 e.state,
312 MergeState::NeedsReview,
313 ));
314 }
315 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
316 }
317
318 let event = MergeEvent {
319 ts: Utc::now(),
320 loop_id: loop_id.to_string(),
321 event: MergeEventType::NeedsReview {
322 reason: reason.to_string(),
323 },
324 };
325 self.append_event(&event)
326 }
327
328 pub fn discard(&self, loop_id: &str, reason: Option<&str>) -> Result<(), MergeQueueError> {
335 let entry = self.get_entry(loop_id)?;
337 match entry {
338 Some(e) if e.state == MergeState::Queued || e.state == MergeState::NeedsReview => {}
339 Some(e) => {
340 return Err(MergeQueueError::InvalidTransition(
341 loop_id.to_string(),
342 e.state,
343 MergeState::Discarded,
344 ));
345 }
346 None => return Err(MergeQueueError::NotFound(loop_id.to_string())),
347 }
348
349 let event = MergeEvent {
350 ts: Utc::now(),
351 loop_id: loop_id.to_string(),
352 event: MergeEventType::Discarded {
353 reason: reason.map(String::from),
354 },
355 };
356 self.append_event(&event)
357 }
358
359 pub fn next_pending(&self) -> Result<Option<MergeEntry>, MergeQueueError> {
363 let entries = self.list()?;
364 Ok(entries.into_iter().find(|e| e.state == MergeState::Queued))
365 }
366
367 pub fn get_entry(&self, loop_id: &str) -> Result<Option<MergeEntry>, MergeQueueError> {
369 let entries = self.list()?;
370 Ok(entries.into_iter().find(|e| e.loop_id == loop_id))
371 }
372
373 pub fn list(&self) -> Result<Vec<MergeEntry>, MergeQueueError> {
377 let events = self.read_all_events()?;
378 Ok(Self::derive_state(&events))
379 }
380
381 pub fn list_by_state(&self, state: MergeState) -> Result<Vec<MergeEntry>, MergeQueueError> {
383 let entries = self.list()?;
384 Ok(entries.into_iter().filter(|e| e.state == state).collect())
385 }
386
387 fn read_all_events(&self) -> Result<Vec<MergeEvent>, MergeQueueError> {
389 if !self.queue_path.exists() {
390 return Ok(Vec::new());
391 }
392
393 self.with_shared_lock(|file| {
394 let reader = BufReader::new(file);
395 let mut events = Vec::new();
396
397 for (line_num, line) in reader.lines().enumerate() {
398 let line = line?;
399 if line.trim().is_empty() {
400 continue;
401 }
402
403 let event: MergeEvent = serde_json::from_str(&line).map_err(|e| {
404 MergeQueueError::ParseError(format!("Line {}: {}", line_num + 1, e))
405 })?;
406 events.push(event);
407 }
408
409 Ok(events)
410 })
411 }
412
413 fn derive_state(events: &[MergeEvent]) -> Vec<MergeEntry> {
415 use std::collections::HashMap;
416
417 let mut loop_states: HashMap<String, MergeEntry> = HashMap::new();
419
420 for event in events {
421 let entry = loop_states
422 .entry(event.loop_id.clone())
423 .or_insert_with(|| MergeEntry {
424 loop_id: event.loop_id.clone(),
425 prompt: String::new(),
426 state: MergeState::Queued,
427 queued_at: event.ts,
428 merge_pid: None,
429 merge_commit: None,
430 failure_reason: None,
431 discard_reason: None,
432 });
433
434 match &event.event {
435 MergeEventType::Queued { prompt } => {
436 entry.prompt = prompt.clone();
437 entry.state = MergeState::Queued;
438 entry.queued_at = event.ts;
439 }
440 MergeEventType::Merging { pid } => {
441 entry.state = MergeState::Merging;
442 entry.merge_pid = Some(*pid);
443 }
444 MergeEventType::Merged { commit } => {
445 entry.state = MergeState::Merged;
446 entry.merge_commit = Some(commit.clone());
447 }
448 MergeEventType::NeedsReview { reason } => {
449 entry.state = MergeState::NeedsReview;
450 entry.failure_reason = Some(reason.clone());
451 }
452 MergeEventType::Discarded { reason } => {
453 entry.state = MergeState::Discarded;
454 entry.discard_reason = reason.clone();
455 }
456 }
457 }
458
459 let mut entries: Vec<_> = loop_states.into_values().collect();
461 entries.sort_by_key(|a| a.queued_at);
462 entries
463 }
464
465 fn append_event(&self, event: &MergeEvent) -> Result<(), MergeQueueError> {
467 self.with_exclusive_lock(|mut file| {
468 file.seek(SeekFrom::End(0))?;
470
471 let json = serde_json::to_string(event)
473 .map_err(|e| MergeQueueError::ParseError(e.to_string()))?;
474 writeln!(file, "{}", json)?;
475
476 file.sync_all()?;
477 Ok(())
478 })
479 }
480
481 #[cfg(unix)]
483 fn with_shared_lock<T, F>(&self, f: F) -> Result<T, MergeQueueError>
484 where
485 F: FnOnce(&File) -> Result<T, MergeQueueError>,
486 {
487 use nix::fcntl::{Flock, FlockArg};
488
489 let file = File::open(&self.queue_path)?;
490
491 let flock = Flock::lock(file, FlockArg::LockShared).map_err(|(_, errno)| {
493 MergeQueueError::Io(io::Error::new(
494 io::ErrorKind::Other,
495 format!("flock failed: {}", errno),
496 ))
497 })?;
498
499 use std::os::fd::AsFd;
501 let borrowed_fd = flock.as_fd();
502 let owned_fd = borrowed_fd.try_clone_to_owned()?;
503 let file: File = owned_fd.into();
504
505 f(&file)
506 }
507
508 #[cfg(not(unix))]
509 fn with_shared_lock<T, F>(&self, _f: F) -> Result<T, MergeQueueError>
510 where
511 F: FnOnce(&File) -> Result<T, MergeQueueError>,
512 {
513 Err(MergeQueueError::UnsupportedPlatform)
514 }
515
516 #[cfg(unix)]
518 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T, MergeQueueError>
519 where
520 F: FnOnce(File) -> Result<T, MergeQueueError>,
521 {
522 use nix::fcntl::{Flock, FlockArg};
523
524 if let Some(parent) = self.queue_path.parent() {
526 fs::create_dir_all(parent)?;
527 }
528
529 let file = OpenOptions::new()
531 .read(true)
532 .write(true)
533 .create(true)
534 .truncate(false)
535 .open(&self.queue_path)?;
536
537 let flock = Flock::lock(file, FlockArg::LockExclusive).map_err(|(_, errno)| {
539 MergeQueueError::Io(io::Error::new(
540 io::ErrorKind::Other,
541 format!("flock failed: {}", errno),
542 ))
543 })?;
544
545 use std::os::fd::AsFd;
547 let borrowed_fd = flock.as_fd();
548 let owned_fd = borrowed_fd.try_clone_to_owned()?;
549 let file: File = owned_fd.into();
550
551 f(file)
552 }
553
554 #[cfg(not(unix))]
555 fn with_exclusive_lock<T, F>(&self, _f: F) -> Result<T, MergeQueueError>
556 where
557 F: FnOnce(File) -> Result<T, MergeQueueError>,
558 {
559 Err(MergeQueueError::UnsupportedPlatform)
560 }
561}
562
563pub fn merge_button_state(
569 workspace: &Path,
570 loop_id: &str,
571) -> Result<MergeButtonState, MergeQueueError> {
572 let queue = MergeQueue::new(workspace);
573
574 if let Some(entry) = queue.get_entry(loop_id)?
576 && entry.state == MergeState::Merging
577 {
578 return Ok(MergeButtonState::Blocked {
579 reason: "Merge already in progress".to_string(),
580 });
581 }
582
583 if let Ok(Some(metadata)) = LoopLock::read_existing(workspace) {
587 if is_pid_alive(metadata.pid) {
589 return Ok(MergeButtonState::Blocked {
590 reason: format!("primary loop running: {}", metadata.prompt),
591 });
592 }
593 }
594
595 Ok(MergeButtonState::Active)
596}
597
598fn is_pid_alive(pid: u32) -> bool {
600 #[cfg(unix)]
601 {
602 use nix::sys::signal::kill;
603 use nix::unistd::Pid;
604 kill(Pid::from_raw(pid as i32), None).is_ok()
606 }
607
608 #[cfg(not(unix))]
609 {
610 true
612 }
613}
614
615pub fn smart_merge_summary(workspace: &Path, loop_id: &str) -> Result<String, MergeQueueError> {
621 let branch_name = format!("ralph/{}", loop_id);
622
623 let output = Command::new("git")
625 .args([
626 "log",
627 "--oneline",
628 "--no-walk=unsorted",
629 &format!("main..{}", branch_name),
630 ])
631 .current_dir(workspace)
632 .output()?;
633
634 let log_output = String::from_utf8_lossy(&output.stdout);
635 let lines: Vec<&str> = log_output.lines().collect();
636
637 let summary = if lines.is_empty() {
639 let output = Command::new("git")
641 .args(["log", "-1", "--oneline", &branch_name])
642 .current_dir(workspace)
643 .output()?;
644
645 let msg = String::from_utf8_lossy(&output.stdout);
646 extract_summary_from_line(msg.trim())
647 } else {
648 extract_summary_from_line(lines[0])
650 };
651
652 let prefix_len = 14; let suffix_len = 8 + loop_id.len(); let max_summary_len = 72 - prefix_len - suffix_len;
656
657 let summary = truncate_with_ellipsis(&summary, max_summary_len);
659
660 Ok(summary)
661}
662
663fn extract_summary_from_line(line: &str) -> String {
665 if let Some(idx) = line.find(' ') {
667 line[idx + 1..].to_string()
668 } else {
669 line.to_string()
670 }
671}
672
673pub fn merge_needs_steering(
675 workspace: &Path,
676 loop_id: &str,
677) -> Result<SteeringDecision, MergeQueueError> {
678 let branch_name = format!("ralph/{}", loop_id);
679
680 let output = Command::new("git")
682 .args(["merge-tree", "--write-tree", "main", &branch_name])
683 .current_dir(workspace)
684 .output()?;
685
686 let has_conflicts =
688 !output.status.success() || String::from_utf8_lossy(&output.stdout).contains("CONFLICT");
689
690 if has_conflicts {
691 let diff_output = Command::new("git")
693 .args(["diff", "--name-only", "main", &branch_name])
694 .current_dir(workspace)
695 .output()?;
696
697 let files = String::from_utf8_lossy(&diff_output.stdout);
698 let file_list: Vec<&str> = files.lines().take(3).collect();
699
700 let reason = if file_list.is_empty() {
701 "Potential conflict detected".to_string()
702 } else {
703 format!("Files modified on both branches: {}", file_list.join(", "))
704 };
705
706 Ok(SteeringDecision {
707 needs_input: true,
708 reason,
709 options: vec![
710 MergeOption {
711 label: "Use ours (main)".to_string(),
712 },
713 MergeOption {
714 label: "Use theirs (branch)".to_string(),
715 },
716 MergeOption {
717 label: "Manual resolution".to_string(),
718 },
719 ],
720 })
721 } else {
722 Ok(SteeringDecision {
723 needs_input: false,
724 reason: String::new(),
725 options: vec![],
726 })
727 }
728}
729
730pub fn merge_execution_summary(workspace: &Path, loop_id: &str) -> Result<String, MergeQueueError> {
734 let branch_name = format!("ralph/{}", loop_id);
735
736 let count_output = Command::new("git")
738 .args(["rev-list", "--count", &format!("main..{}", branch_name)])
739 .current_dir(workspace)
740 .output()?;
741
742 let commit_count = String::from_utf8_lossy(&count_output.stdout)
743 .trim()
744 .parse::<usize>()
745 .unwrap_or(0);
746
747 let files_output = Command::new("git")
749 .args(["diff", "--name-only", "main", &branch_name])
750 .current_dir(workspace)
751 .output()?;
752
753 let files = String::from_utf8_lossy(&files_output.stdout);
754 let file_count = files.lines().count();
755
756 let log_output = Command::new("git")
758 .args(["log", "-1", "--format=%s", &branch_name])
759 .current_dir(workspace)
760 .output()?;
761
762 let last_commit = String::from_utf8_lossy(&log_output.stdout)
763 .trim()
764 .to_string();
765
766 let summary = format!(
768 "{} commit{}, {} file{} changed: {}",
769 commit_count,
770 if commit_count == 1 { "" } else { "s" },
771 file_count,
772 if file_count == 1 { "" } else { "s" },
773 last_commit
774 );
775
776 Ok(summary)
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782 use tempfile::TempDir;
783
784 #[test]
785 fn test_enqueue() {
786 let temp_dir = TempDir::new().unwrap();
787 let queue = MergeQueue::new(temp_dir.path());
788
789 queue.enqueue("loop-123", "implement auth").unwrap();
790
791 let entries = queue.list().unwrap();
792 assert_eq!(entries.len(), 1);
793 assert_eq!(entries[0].loop_id, "loop-123");
794 assert_eq!(entries[0].prompt, "implement auth");
795 assert_eq!(entries[0].state, MergeState::Queued);
796 }
797
798 #[test]
799 fn test_full_merge_lifecycle() {
800 let temp_dir = TempDir::new().unwrap();
801 let queue = MergeQueue::new(temp_dir.path());
802
803 queue.enqueue("loop-abc", "test prompt").unwrap();
805 let entry = queue.get_entry("loop-abc").unwrap().unwrap();
806 assert_eq!(entry.state, MergeState::Queued);
807
808 queue.mark_merging("loop-abc", 12345).unwrap();
810 let entry = queue.get_entry("loop-abc").unwrap().unwrap();
811 assert_eq!(entry.state, MergeState::Merging);
812 assert_eq!(entry.merge_pid, Some(12345));
813
814 queue.mark_merged("loop-abc", "commit-sha-123").unwrap();
816 let entry = queue.get_entry("loop-abc").unwrap().unwrap();
817 assert_eq!(entry.state, MergeState::Merged);
818 assert_eq!(entry.merge_commit, Some("commit-sha-123".to_string()));
819 }
820
821 #[test]
822 fn test_merge_needs_review() {
823 let temp_dir = TempDir::new().unwrap();
824 let queue = MergeQueue::new(temp_dir.path());
825
826 queue.enqueue("loop-def", "test").unwrap();
827 queue.mark_merging("loop-def", 99999).unwrap();
828 queue
829 .mark_needs_review("loop-def", "Conflicting changes in src/auth.rs")
830 .unwrap();
831
832 let entry = queue.get_entry("loop-def").unwrap().unwrap();
833 assert_eq!(entry.state, MergeState::NeedsReview);
834 assert_eq!(
835 entry.failure_reason,
836 Some("Conflicting changes in src/auth.rs".to_string())
837 );
838 }
839
840 #[test]
841 fn test_discard_from_queued() {
842 let temp_dir = TempDir::new().unwrap();
843 let queue = MergeQueue::new(temp_dir.path());
844
845 queue.enqueue("loop-xyz", "test").unwrap();
846 queue.discard("loop-xyz", Some("No longer needed")).unwrap();
847
848 let entry = queue.get_entry("loop-xyz").unwrap().unwrap();
849 assert_eq!(entry.state, MergeState::Discarded);
850 assert_eq!(entry.discard_reason, Some("No longer needed".to_string()));
851 }
852
853 #[test]
854 fn test_discard_from_needs_review() {
855 let temp_dir = TempDir::new().unwrap();
856 let queue = MergeQueue::new(temp_dir.path());
857
858 queue.enqueue("loop-xyz", "test").unwrap();
859 queue.mark_merging("loop-xyz", 123).unwrap();
860 queue.mark_needs_review("loop-xyz", "conflicts").unwrap();
861 queue.discard("loop-xyz", None).unwrap();
862
863 let entry = queue.get_entry("loop-xyz").unwrap().unwrap();
864 assert_eq!(entry.state, MergeState::Discarded);
865 }
866
867 #[test]
868 fn test_next_pending_fifo() {
869 let temp_dir = TempDir::new().unwrap();
870 let queue = MergeQueue::new(temp_dir.path());
871
872 queue.enqueue("loop-1", "first").unwrap();
873 std::thread::sleep(std::time::Duration::from_millis(10));
874 queue.enqueue("loop-2", "second").unwrap();
875 std::thread::sleep(std::time::Duration::from_millis(10));
876 queue.enqueue("loop-3", "third").unwrap();
877
878 let pending = queue.next_pending().unwrap().unwrap();
880 assert_eq!(pending.loop_id, "loop-1");
881
882 queue.mark_merging("loop-1", 123).unwrap();
884
885 let pending = queue.next_pending().unwrap().unwrap();
887 assert_eq!(pending.loop_id, "loop-2");
888 }
889
890 #[test]
891 fn test_invalid_transition_queued_to_merged() {
892 let temp_dir = TempDir::new().unwrap();
893 let queue = MergeQueue::new(temp_dir.path());
894
895 queue.enqueue("loop-xyz", "test").unwrap();
896
897 let result = queue.mark_merged("loop-xyz", "commit");
899 assert!(matches!(
900 result,
901 Err(MergeQueueError::InvalidTransition(
902 _,
903 MergeState::Queued,
904 MergeState::Merged
905 ))
906 ));
907 }
908
909 #[test]
910 fn test_invalid_transition_merged_to_needs_review() {
911 let temp_dir = TempDir::new().unwrap();
912 let queue = MergeQueue::new(temp_dir.path());
913
914 queue.enqueue("loop-xyz", "test").unwrap();
915 queue.mark_merging("loop-xyz", 123).unwrap();
916 queue.mark_merged("loop-xyz", "abc").unwrap();
917
918 let result = queue.mark_needs_review("loop-xyz", "error");
920 assert!(matches!(
921 result,
922 Err(MergeQueueError::InvalidTransition(
923 _,
924 MergeState::Merged,
925 MergeState::NeedsReview
926 ))
927 ));
928 }
929
930 #[test]
931 fn test_not_found() {
932 let temp_dir = TempDir::new().unwrap();
933 let queue = MergeQueue::new(temp_dir.path());
934
935 let result = queue.mark_merging("nonexistent", 123);
936 assert!(matches!(result, Err(MergeQueueError::NotFound(_))));
937 }
938
939 #[test]
940 fn test_retry_from_needs_review() {
941 let temp_dir = TempDir::new().unwrap();
942 let queue = MergeQueue::new(temp_dir.path());
943
944 queue.enqueue("loop-retry", "test").unwrap();
945 queue.mark_merging("loop-retry", 100).unwrap();
946 queue.mark_needs_review("loop-retry", "conflicts").unwrap();
947
948 queue.mark_merging("loop-retry", 200).unwrap();
950 let entry = queue.get_entry("loop-retry").unwrap().unwrap();
951 assert_eq!(entry.state, MergeState::Merging);
952 assert_eq!(entry.merge_pid, Some(200));
953 }
954
955 #[test]
956 fn test_list_by_state() {
957 let temp_dir = TempDir::new().unwrap();
958 let queue = MergeQueue::new(temp_dir.path());
959
960 queue.enqueue("loop-1", "test 1").unwrap();
961 queue.enqueue("loop-2", "test 2").unwrap();
962 queue.enqueue("loop-3", "test 3").unwrap();
963
964 queue.mark_merging("loop-1", 123).unwrap();
965 queue.mark_merged("loop-1", "abc").unwrap();
966
967 queue.mark_merging("loop-2", 456).unwrap();
968
969 let queued = queue.list_by_state(MergeState::Queued).unwrap();
970 assert_eq!(queued.len(), 1);
971 assert_eq!(queued[0].loop_id, "loop-3");
972
973 let merging = queue.list_by_state(MergeState::Merging).unwrap();
974 assert_eq!(merging.len(), 1);
975 assert_eq!(merging[0].loop_id, "loop-2");
976
977 let merged = queue.list_by_state(MergeState::Merged).unwrap();
978 assert_eq!(merged.len(), 1);
979 assert_eq!(merged[0].loop_id, "loop-1");
980 }
981
982 #[test]
983 fn test_empty_queue() {
984 let temp_dir = TempDir::new().unwrap();
985 let queue = MergeQueue::new(temp_dir.path());
986
987 let entries = queue.list().unwrap();
988 assert!(entries.is_empty());
989
990 let pending = queue.next_pending().unwrap();
991 assert!(pending.is_none());
992 }
993
994 #[test]
995 fn test_persistence() {
996 let temp_dir = TempDir::new().unwrap();
997
998 {
999 let queue = MergeQueue::new(temp_dir.path());
1000 queue.enqueue("loop-persist", "test persistence").unwrap();
1001 }
1002
1003 {
1005 let queue = MergeQueue::new(temp_dir.path());
1006 let entries = queue.list().unwrap();
1007 assert_eq!(entries.len(), 1);
1008 assert_eq!(entries[0].loop_id, "loop-persist");
1009 assert_eq!(entries[0].prompt, "test persistence");
1010 }
1011 }
1012
1013 #[test]
1014 fn test_event_serialization() {
1015 let event = MergeEvent {
1016 ts: Utc::now(),
1017 loop_id: "loop-test".to_string(),
1018 event: MergeEventType::Queued {
1019 prompt: "test prompt".to_string(),
1020 },
1021 };
1022
1023 let json = serde_json::to_string(&event).unwrap();
1024 let parsed: MergeEvent = serde_json::from_str(&json).unwrap();
1025
1026 assert_eq!(parsed.loop_id, event.loop_id);
1027 match parsed.event {
1028 MergeEventType::Queued { prompt } => assert_eq!(prompt, "test prompt"),
1029 _ => panic!("Wrong event type"),
1030 }
1031 }
1032
1033 #[test]
1034 fn test_creates_ralph_directory() {
1035 let temp_dir = TempDir::new().unwrap();
1036 let ralph_dir = temp_dir.path().join(".ralph");
1037 let queue_file = ralph_dir.join("merge-queue.jsonl");
1038
1039 assert!(!ralph_dir.exists());
1040 assert!(!queue_file.exists());
1041
1042 let queue = MergeQueue::new(temp_dir.path());
1043 queue.enqueue("loop-dir", "test").unwrap();
1044
1045 assert!(ralph_dir.exists());
1046 assert!(queue_file.exists());
1047 }
1048}