1#![allow(dead_code)]
19
20use crate::commands::run::supervision::{
21 ContinueSession, capture_ci_gate_result, resume_continue_session,
22};
23use crate::config::Resolved;
24use crate::contracts::TaskStatus;
25use crate::git;
26use crate::git::error::git_output;
27use crate::queue;
28use crate::runutil::sleep_with_cancellation;
29use crate::timeutil;
30use anyhow::{Context, Result, bail};
31use serde::{Deserialize, Serialize};
32use std::path::{Path, PathBuf};
33use std::time::Duration;
34
35#[derive(Debug, Clone)]
41pub struct IntegrationConfig {
42 pub max_attempts: u32,
44 pub backoff_ms: Vec<u64>,
46 pub target_branch: String,
48 pub ci_enabled: bool,
50 pub ci_label: String,
52}
53
54impl IntegrationConfig {
55 pub fn from_resolved(resolved: &Resolved, target_branch: &str) -> Self {
56 let parallel = &resolved.config.parallel;
57 let target_branch = target_branch.trim();
58 Self {
59 max_attempts: parallel.max_push_attempts.unwrap_or(50) as u32,
60 backoff_ms: parallel
61 .push_backoff_ms
62 .clone()
63 .unwrap_or_else(super::default_push_backoff_ms),
64 target_branch: if target_branch.is_empty() {
65 "main".to_string()
66 } else {
67 target_branch.to_string()
68 },
69 ci_enabled: resolved
70 .config
71 .agent
72 .ci_gate
73 .as_ref()
74 .is_some_and(|ci_gate| ci_gate.is_enabled()),
75 ci_label: crate::commands::run::supervision::ci_gate_command_label(resolved),
76 }
77 }
78
79 pub fn backoff_for_attempt(&self, attempt: usize) -> Duration {
81 let ms = self
82 .backoff_ms
83 .get(attempt)
84 .copied()
85 .unwrap_or_else(|| self.backoff_ms.last().copied().unwrap_or(10_000));
86 Duration::from_millis(ms)
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum IntegrationOutcome {
97 Success,
99 BlockedPush { reason: String },
101 Failed { reason: String },
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub(crate) struct BlockedPushMarker {
108 pub task_id: String,
109 pub reason: String,
110 pub attempt: u32,
111 pub max_attempts: u32,
112 pub generated_at: String,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct RemediationHandoff {
122 pub task_id: String,
124 pub task_title: String,
126 pub target_branch: String,
128 pub attempt: u32,
130 pub max_attempts: u32,
132 pub conflict_files: Vec<String>,
134 pub git_status: String,
136 pub phase_summary: String,
138 pub task_intent: String,
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub ci_context: Option<CiContext>,
143 pub generated_at: String,
145 pub queue_done_rules: QueueDoneRules,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct CiContext {
152 pub command: String,
153 pub last_output: String,
154 pub exit_code: i32,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct QueueDoneRules {
160 pub rules: Vec<String>,
161}
162
163impl Default for QueueDoneRules {
164 fn default() -> Self {
165 Self {
166 rules: vec![
167 "Remove the completed task from the queue file".into(),
168 "Ensure the completed task is present in the done archive file".into(),
169 "Preserve entries from other workers exactly".into(),
170 "Preserve task metadata (timestamps/notes)".into(),
171 ],
172 }
173 }
174}
175
176impl RemediationHandoff {
177 pub fn new(
178 task_id: impl Into<String>,
179 task_title: impl Into<String>,
180 target_branch: impl Into<String>,
181 attempt: u32,
182 max_attempts: u32,
183 ) -> Self {
184 Self {
185 task_id: task_id.into(),
186 task_title: task_title.into(),
187 target_branch: target_branch.into(),
188 attempt,
189 max_attempts,
190 conflict_files: Vec::new(),
191 git_status: String::new(),
192 phase_summary: String::new(),
193 task_intent: String::new(),
194 ci_context: None,
195 generated_at: timeutil::now_utc_rfc3339_or_fallback(),
196 queue_done_rules: QueueDoneRules::default(),
197 }
198 }
199
200 pub fn with_conflicts(mut self, files: Vec<String>) -> Self {
201 self.conflict_files = files;
202 self
203 }
204
205 pub fn with_git_status(mut self, status: String) -> Self {
206 self.git_status = status;
207 self
208 }
209
210 pub fn with_phase_summary(mut self, summary: String) -> Self {
211 self.phase_summary = summary;
212 self
213 }
214
215 pub fn with_task_intent(mut self, intent: String) -> Self {
216 self.task_intent = intent;
217 self
218 }
219
220 pub fn with_ci_context(mut self, command: String, last_output: String, exit_code: i32) -> Self {
221 self.ci_context = Some(CiContext {
222 command,
223 last_output,
224 exit_code,
225 });
226 self
227 }
228}
229
230fn blocked_push_marker_path(workspace_path: &Path) -> PathBuf {
231 workspace_path.join(super::BLOCKED_PUSH_MARKER_FILE)
232}
233
234fn write_blocked_push_marker(
235 workspace_path: &Path,
236 task_id: &str,
237 reason: &str,
238 attempt: u32,
239 max_attempts: u32,
240) -> Result<()> {
241 let marker = BlockedPushMarker {
242 task_id: task_id.trim().to_string(),
243 reason: reason.to_string(),
244 attempt,
245 max_attempts,
246 generated_at: timeutil::now_utc_rfc3339_or_fallback(),
247 };
248 let marker_path = blocked_push_marker_path(workspace_path);
249 if let Some(parent) = marker_path.parent() {
250 std::fs::create_dir_all(parent)
251 .with_context(|| format!("create blocked marker directory {}", parent.display()))?;
252 }
253 let rendered = serde_json::to_string_pretty(&marker).context("serialize blocked marker")?;
254 crate::fsutil::write_atomic(&marker_path, rendered.as_bytes())
255 .with_context(|| format!("write blocked marker {}", marker_path.display()))?;
256 Ok(())
257}
258
259fn clear_blocked_push_marker(workspace_path: &Path) {
260 let marker_path = blocked_push_marker_path(workspace_path);
261 if !marker_path.exists() {
262 return;
263 }
264 if let Err(err) = std::fs::remove_file(&marker_path) {
265 log::warn!(
266 "Failed to clear blocked marker at {}: {}",
267 marker_path.display(),
268 err
269 );
270 }
271}
272
273pub(crate) fn read_blocked_push_marker(workspace_path: &Path) -> Result<Option<BlockedPushMarker>> {
274 let marker_path = blocked_push_marker_path(workspace_path);
275 if !marker_path.exists() {
276 return Ok(None);
277 }
278 let raw = std::fs::read_to_string(&marker_path)
279 .with_context(|| format!("read blocked marker {}", marker_path.display()))?;
280 let marker =
281 serde_json::from_str::<BlockedPushMarker>(&raw).context("parse blocked marker json")?;
282 Ok(Some(marker))
283}
284
285pub fn write_handoff_packet(
287 workspace_path: &Path,
288 task_id: &str,
289 attempt: u32,
290 handoff: &RemediationHandoff,
291) -> Result<PathBuf> {
292 let handoff_dir = workspace_path
293 .join(".ralph/cache/parallel/handoffs")
294 .join(task_id);
295 std::fs::create_dir_all(&handoff_dir)
296 .with_context(|| format!("create handoff directory {}", handoff_dir.display()))?;
297
298 let path = handoff_dir.join(format!("attempt_{}.json", attempt));
299 let content = serde_json::to_string_pretty(handoff).context("serialize handoff packet")?;
300 crate::fsutil::write_atomic(&path, content.as_bytes())
301 .with_context(|| format!("write handoff packet to {}", path.display()))?;
302 Ok(path)
303}
304
305#[derive(Debug, Clone)]
311pub struct ComplianceResult {
312 pub has_unresolved_conflicts: bool,
313 pub queue_done_valid: bool,
314 pub task_archived: bool,
315 pub ci_passed: bool,
316 pub conflict_files: Vec<String>,
317 pub validation_error: Option<String>,
318}
319
320impl ComplianceResult {
321 pub fn all_passed(&self) -> bool {
322 !self.has_unresolved_conflicts
323 && self.queue_done_valid
324 && self.task_archived
325 && self.ci_passed
326 }
327}
328
329pub fn run_compliance_checks(
331 repo_root: &Path,
332 resolved: &Resolved,
333 task_id: &str,
334 ci_enabled: bool,
335) -> Result<ComplianceResult> {
336 let conflict_files = git::list_conflict_files(repo_root)?;
337 let has_unresolved_conflicts = !conflict_files.is_empty();
338
339 let mut errors = Vec::new();
340 if has_unresolved_conflicts {
341 errors.push("unresolved merge conflicts remain".to_string());
342 }
343
344 let queue_done_valid = match validate_queue_done_semantics(repo_root, resolved) {
345 Ok(()) => true,
346 Err(err) => {
347 errors.push(format!("queue/done semantic validation failed: {}", err));
348 false
349 }
350 };
351
352 let task_archived = match validate_task_archived(resolved, task_id) {
353 Ok(()) => true,
354 Err(err) => {
355 errors.push(format!("task archival validation failed: {}", err));
356 false
357 }
358 };
359
360 let ci_passed = if ci_enabled {
361 match run_ci_check(repo_root, resolved) {
362 Ok(()) => true,
363 Err(err) => {
364 errors.push(format!("CI gate failed: {}", err));
365 false
366 }
367 }
368 } else {
369 true
370 };
371
372 Ok(ComplianceResult {
373 has_unresolved_conflicts,
374 queue_done_valid,
375 task_archived,
376 ci_passed,
377 conflict_files,
378 validation_error: if errors.is_empty() {
379 None
380 } else {
381 Some(errors.join("; "))
382 },
383 })
384}
385
386fn validate_queue_done_semantics(_repo_root: &Path, resolved: &Resolved) -> Result<()> {
388 let queue_path = resolved.queue_path.clone();
389 let done_path = resolved.done_path.clone();
390
391 let queue = queue::load_queue(&queue_path).context("load queue for validation")?;
392 let max_depth = resolved.config.queue.max_dependency_depth.unwrap_or(10);
393 let done = if done_path.exists() {
394 Some(queue::load_queue(&done_path).context("load done for validation")?)
395 } else {
396 None
397 };
398
399 queue::validate_queue_set(
400 &queue,
401 done.as_ref(),
402 &resolved.id_prefix,
403 resolved.id_width,
404 max_depth,
405 )
406 .context("queue/done semantic validation")?;
407
408 Ok(())
409}
410
411fn validate_task_archived(resolved: &Resolved, task_id: &str) -> Result<()> {
413 let task_id = task_id.trim();
414 if task_id.is_empty() {
415 bail!("task id is empty");
416 }
417
418 let queue_path = resolved.queue_path.clone();
419 let done_path = resolved.done_path.clone();
420
421 if !queue_path.exists() {
422 bail!("queue file missing at {}", queue_path.display());
423 }
424 if !done_path.exists() {
425 bail!("done file missing at {}", done_path.display());
426 }
427
428 let queue_file = queue::load_queue(&queue_path)
429 .with_context(|| format!("load queue file {}", queue_path.display()))?;
430 if queue_file
431 .tasks
432 .iter()
433 .any(|task| task.id.trim() == task_id)
434 {
435 bail!("task {} still present in {}", task_id, queue_path.display());
436 }
437
438 let done_file = queue::load_queue(&done_path)
439 .with_context(|| format!("load done file {}", done_path.display()))?;
440 let done_task = done_file
441 .tasks
442 .iter()
443 .find(|task| task.id.trim() == task_id)
444 .ok_or_else(|| anyhow::anyhow!("task {} missing from {}", task_id, done_path.display()))?;
445
446 if done_task.status != TaskStatus::Done {
447 bail!(
448 "task {} exists in done but status is {:?}, expected done",
449 task_id,
450 done_task.status
451 );
452 }
453
454 Ok(())
455}
456
457fn run_ci_check(_repo_root: &Path, resolved: &Resolved) -> Result<()> {
459 let result = capture_ci_gate_result(resolved)?;
460 if !result.success {
461 let combined = format!("{}\n{}", result.stdout, result.stderr).to_lowercase();
462 if combined.contains("waiting for file lock")
463 || combined.contains("file lock on build directory")
464 {
465 bail!(
466 "CI lock contention detected (stale build/test process likely holding a lock). {} | {}",
467 result.stdout.trim(),
468 result.stderr.trim()
469 );
470 }
471 bail!("{} | {}", result.stdout.trim(), result.stderr.trim());
472 }
473
474 Ok(())
475}
476
477fn head_is_synced_to_remote(repo_root: &Path, target_branch: &str) -> Result<bool> {
479 git::fetch_branch(repo_root, "origin", target_branch)
480 .with_context(|| format!("fetch origin/{} for sync check", target_branch))?;
481
482 let remote_ref = format!("origin/{}", target_branch);
483 let output = git_output(
484 repo_root,
485 &["merge-base", "--is-ancestor", "HEAD", &remote_ref],
486 )
487 .with_context(|| format!("check if HEAD is ancestor of {}", remote_ref))?;
488
489 if output.status.success() {
490 return Ok(true);
491 }
492 if output.status.code() == Some(1) {
493 return Ok(false);
494 }
495
496 let stderr = String::from_utf8_lossy(&output.stderr);
497 bail!(
498 "unable to verify push sync against {}: {}",
499 remote_ref,
500 stderr.trim()
501 );
502}
503
504#[allow(clippy::too_many_arguments)]
509fn build_agent_integration_prompt(
510 task_id: &str,
511 task_title: &str,
512 target_branch: &str,
513 queue_path: &Path,
514 done_path: &Path,
515 attempt: u32,
516 max_attempts: u32,
517 phase_summary: &str,
518 status_snapshot: &str,
519 ci_enabled: bool,
520 ci_label: &str,
521 previous_failure: Option<&str>,
522) -> String {
523 let queue_path_display = queue_path.display();
524 let done_path_display = done_path.display();
525 let failure_block = previous_failure.map_or_else(String::new, |failure| {
526 format!("\n## Previous Attempt Failed\n{}\n", failure)
527 });
528
529 let ci_block = if ci_enabled {
530 format!(
531 "- Run CI gate and fix failures before pushing: `{}`",
532 ci_label
533 )
534 } else {
535 "- CI gate is disabled for this task".to_string()
536 };
537
538 sanitize_prompt_for_runner(&format!(
539 r#"# Parallel Integration (Mandatory) - Attempt {attempt}/{max_attempts}
540You are finalizing task `{task_id}` (`{task_title}`) for direct push to `origin/{target_branch}`.
541
542## Hard Requirement
543You MUST execute integration git operations yourself in this turn. Do not stop early.
544You are NOT done until all required checks are satisfied.
545
546## Context
547- Phase summary: {phase_summary}
548- Current git status snapshot:
549```text
550{status_snapshot}
551```
552{failure_block}
553## Required Sequence
5541. `git fetch origin {target_branch}`
5552. Rebase on latest remote state: `git rebase origin/{target_branch}`
5563. If conflicts exist:
557 - Resolve every conflict marker while preserving both upstream and task intent.
558 - For queue/done files, preserve other workers' entries exactly.
559 - Ensure `{task_id}` is removed from queue and present as done in done.
560 - Continue rebase until complete (`git add ...`, `git rebase --continue`).
5614. Ensure bookkeeping is correct:
562 - `{queue_path_display}` does NOT contain `{task_id}`
563 - `{done_path_display}` DOES contain `{task_id}` with done status
5645. Stage and commit any remaining changes needed for integration.
5656. {ci_block}
5667. Push directly to base branch: `git push origin HEAD:{target_branch}`
5678. If push is rejected (non-fast-forward), repeat from step 1 in this same turn.
568
569## Completion Contract (Mandatory)
570Before ending your response:
571- No unresolved merge conflicts remain.
572- Push to `origin/{target_branch}` has succeeded.
573- Bookkeeping files are semantically correct for `{task_id}`.
574- CI has passed when enabled.
575
576If any check fails, keep working in this same turn until fixed.
577"#
578 ))
579}
580
581fn sanitize_prompt_for_runner(prompt: &str) -> String {
582 prompt
583 .chars()
584 .map(|c| {
585 if c.is_control() && c != '\n' && c != '\r' && c != '\t' {
586 ' '
587 } else {
588 c
589 }
590 })
591 .collect()
592}
593
594fn compose_block_reason(
595 compliance: &ComplianceResult,
596 pushed: bool,
597 extra: Option<&str>,
598) -> String {
599 let mut reasons = Vec::new();
600
601 if compliance.has_unresolved_conflicts {
602 reasons.push(format!(
603 "unresolved conflicts: {}",
604 compliance.conflict_files.join(", ")
605 ));
606 }
607 if !compliance.queue_done_valid {
608 reasons.push("queue/done semantic validation failed".to_string());
609 }
610 if !compliance.task_archived {
611 reasons.push("task archival validation failed".to_string());
612 }
613 if !compliance.ci_passed {
614 reasons.push("CI validation failed".to_string());
615 }
616 if !pushed {
617 reasons.push("HEAD is not yet integrated into target branch".to_string());
618 }
619 if let Some(extra) = extra {
620 reasons.push(extra.to_string());
621 }
622
623 if let Some(validation_error) = &compliance.validation_error {
624 reasons.push(validation_error.clone());
625 }
626
627 if reasons.is_empty() {
628 "integration did not satisfy completion contract".to_string()
629 } else {
630 reasons.join("; ")
631 }
632}
633
634#[allow(clippy::too_many_arguments)]
643pub(crate) fn run_integration_loop(
644 resolved: &Resolved,
645 task_id: &str,
646 task_title: &str,
647 config: &IntegrationConfig,
648 phase_summary: &str,
649 continue_session: &mut ContinueSession,
650 on_resume: &mut dyn FnMut(&crate::runner::RunnerOutput, Duration) -> Result<()>,
651 plugins: Option<&crate::plugins::registry::PluginRegistry>,
652) -> Result<IntegrationOutcome> {
653 let repo_root = &resolved.repo_root;
654 clear_blocked_push_marker(repo_root);
655 let mut previous_failure: Option<String> = None;
656
657 for attempt_index in 0..config.max_attempts {
658 let attempt = attempt_index + 1;
659 log::info!(
660 "Agent-owned integration attempt {}/{} for {}",
661 attempt,
662 config.max_attempts,
663 task_id
664 );
665
666 let status_snapshot = git::status_porcelain(repo_root).unwrap_or_default();
667 let prompt = build_agent_integration_prompt(
668 task_id,
669 task_title,
670 &config.target_branch,
671 &resolved.queue_path,
672 &resolved.done_path,
673 attempt,
674 config.max_attempts,
675 phase_summary,
676 &status_snapshot,
677 config.ci_enabled,
678 &config.ci_label,
679 previous_failure.as_deref(),
680 );
681
682 let (output, elapsed) =
683 match resume_continue_session(resolved, continue_session, &prompt, plugins) {
684 Ok(resume) => resume,
685 Err(err) => {
686 let reason = format!("integration continuation failed: {:#}", err);
687 if attempt >= config.max_attempts {
688 if let Err(marker_err) = write_blocked_push_marker(
689 repo_root,
690 task_id,
691 &reason,
692 attempt,
693 config.max_attempts,
694 ) {
695 log::warn!("Failed to write blocked marker: {}", marker_err);
696 }
697 return Ok(IntegrationOutcome::BlockedPush { reason });
698 }
699 previous_failure = Some(reason);
700 wait_before_retry(config, attempt_index as usize, task_id)?;
701 continue;
702 }
703 };
704
705 on_resume(&output, elapsed)?;
706
707 let compliance = run_compliance_checks(repo_root, resolved, task_id, config.ci_enabled)?;
708 let (pushed, push_check_error) =
709 match head_is_synced_to_remote(repo_root, &config.target_branch) {
710 Ok(value) => (value, None),
711 Err(err) => (false, Some(format!("push sync validation failed: {}", err))),
712 };
713
714 if compliance.all_passed() && pushed {
715 log::info!(
716 "Integration succeeded for {} on attempt {}/{}",
717 task_id,
718 attempt,
719 config.max_attempts
720 );
721 return Ok(IntegrationOutcome::Success);
722 }
723
724 let reason = compose_block_reason(&compliance, pushed, push_check_error.as_deref());
725 let mut handoff = RemediationHandoff::new(
726 task_id,
727 task_title,
728 &config.target_branch,
729 attempt,
730 config.max_attempts,
731 )
732 .with_conflicts(compliance.conflict_files.clone())
733 .with_git_status(git::status_porcelain(repo_root).unwrap_or_default())
734 .with_phase_summary(phase_summary.to_string())
735 .with_task_intent(format!("Complete task {}: {}", task_id, task_title));
736
737 if !compliance.ci_passed {
738 handoff = handoff.with_ci_context(
739 config.ci_label.clone(),
740 compliance
741 .validation_error
742 .clone()
743 .unwrap_or_else(|| "CI gate validation failed".to_string()),
744 1,
745 );
746 }
747
748 if let Err(err) = write_handoff_packet(repo_root, task_id, attempt, &handoff) {
749 log::warn!("Failed to persist remediation handoff packet: {}", err);
750 }
751
752 if attempt >= config.max_attempts {
753 if let Err(marker_err) =
754 write_blocked_push_marker(repo_root, task_id, &reason, attempt, config.max_attempts)
755 {
756 log::warn!("Failed to write blocked marker: {}", marker_err);
757 }
758 return Ok(IntegrationOutcome::BlockedPush { reason });
759 }
760
761 previous_failure = Some(reason);
762 wait_before_retry(config, attempt_index as usize, task_id)?;
763 }
764
765 let reason = format!("integration exhausted {} attempts", config.max_attempts);
766 if let Err(marker_err) = write_blocked_push_marker(
767 repo_root,
768 task_id,
769 &reason,
770 config.max_attempts,
771 config.max_attempts,
772 ) {
773 log::warn!("Failed to write blocked marker: {}", marker_err);
774 }
775 Ok(IntegrationOutcome::BlockedPush { reason })
776}
777
778fn wait_before_retry(
779 config: &IntegrationConfig,
780 attempt_index: usize,
781 task_id: &str,
782) -> Result<()> {
783 let delay = config.backoff_for_attempt(attempt_index);
784 log::info!(
785 "Integration retry backoff for {}: sleeping {}ms before next attempt",
786 task_id,
787 delay.as_millis()
788 );
789 sleep_with_cancellation(delay, None)
790 .map_err(|_| anyhow::anyhow!("integration retry cancelled for {}", task_id))
791}
792
793#[cfg(test)]
794mod tests {
795 use super::*;
796 use crate::contracts::{QueueFile, Task, TaskPriority, TaskStatus};
797 use std::collections::HashMap;
798 use tempfile::TempDir;
799
800 fn make_task(id: &str, status: TaskStatus) -> Task {
801 Task {
802 id: id.to_string(),
803 title: format!("Task {}", id),
804 description: None,
805 status,
806 priority: TaskPriority::Medium,
807 tags: vec![],
808 scope: vec![],
809 evidence: vec![],
810 plan: vec![],
811 notes: vec![],
812 request: None,
813 agent: None,
814 created_at: Some("2026-01-01T00:00:00Z".to_string()),
815 updated_at: Some("2026-01-01T00:00:00Z".to_string()),
816 completed_at: None,
817 started_at: None,
818 scheduled_start: None,
819 depends_on: vec![],
820 blocks: vec![],
821 relates_to: vec![],
822 duplicates: None,
823 custom_fields: HashMap::new(),
824 estimated_minutes: None,
825 actual_minutes: None,
826 parent_id: None,
827 }
828 }
829
830 #[test]
831 fn integration_config_default_backoff() {
832 let config = IntegrationConfig {
833 max_attempts: 5,
834 backoff_ms: vec![500, 2000, 5000, 10000],
835 target_branch: "main".into(),
836 ci_enabled: true,
837 ci_label: "make ci".into(),
838 };
839
840 assert_eq!(config.backoff_for_attempt(0), Duration::from_millis(500));
841 assert_eq!(config.backoff_for_attempt(1), Duration::from_millis(2000));
842 assert_eq!(config.backoff_for_attempt(2), Duration::from_millis(5000));
843 assert_eq!(config.backoff_for_attempt(3), Duration::from_millis(10000));
844 assert_eq!(config.backoff_for_attempt(4), Duration::from_millis(10000));
845 assert_eq!(config.backoff_for_attempt(10), Duration::from_millis(10000));
846 }
847
848 #[test]
849 fn remediation_handoff_builder() {
850 let handoff = RemediationHandoff::new("RQ-0001", "Test Task", "main", 2, 5)
851 .with_conflicts(vec!["src/lib.rs".into(), "src/main.rs".into()])
852 .with_git_status("UU src/lib.rs\nUU src/main.rs".into())
853 .with_phase_summary("Implemented feature X".into())
854 .with_task_intent("Complete feature X implementation".into());
855
856 assert_eq!(handoff.task_id, "RQ-0001");
857 assert_eq!(handoff.task_title, "Test Task");
858 assert_eq!(handoff.target_branch, "main");
859 assert_eq!(handoff.attempt, 2);
860 assert_eq!(handoff.max_attempts, 5);
861 assert_eq!(handoff.conflict_files.len(), 2);
862 assert_eq!(handoff.phase_summary, "Implemented feature X");
863 assert!(handoff.ci_context.is_none());
864 }
865
866 #[test]
867 fn remediation_handoff_with_ci() {
868 let handoff = RemediationHandoff::new("RQ-0001", "Test", "main", 1, 5).with_ci_context(
869 "make ci".into(),
870 "test failed".into(),
871 1,
872 );
873
874 assert!(handoff.ci_context.is_some());
875 let ci = handoff.ci_context.unwrap();
876 assert_eq!(ci.command, "make ci");
877 assert_eq!(ci.last_output, "test failed");
878 assert_eq!(ci.exit_code, 1);
879 }
880
881 #[test]
882 fn integration_prompt_contains_mandatory_contract() {
883 let prompt = build_agent_integration_prompt(
884 "RQ-0001",
885 "Implement feature",
886 "main",
887 Path::new("/tmp/queue.json"),
888 Path::new("/tmp/done.json"),
889 1,
890 5,
891 "phase summary",
892 " M src/lib.rs",
893 true,
894 "make ci",
895 Some("previous failure"),
896 );
897
898 assert!(prompt.contains("MUST execute integration git operations"));
899 assert!(prompt.contains("Completion Contract (Mandatory)"));
900 assert!(prompt.contains("git push origin HEAD:main"));
901 assert!(prompt.contains("previous failure"));
902 }
903
904 #[test]
905 fn integration_prompt_uses_explicit_target_branch_for_push() {
906 let prompt = build_agent_integration_prompt(
907 "RQ-0001",
908 "Implement feature",
909 "release/2026",
910 Path::new("/tmp/queue.json"),
911 Path::new("/tmp/done.json"),
912 1,
913 5,
914 "phase summary",
915 " M src/lib.rs",
916 true,
917 "make ci",
918 None,
919 );
920
921 assert!(prompt.contains("git fetch origin release/2026"));
922 assert!(prompt.contains("git rebase origin/release/2026"));
923 assert!(prompt.contains("git push origin HEAD:release/2026"));
924 }
925
926 #[test]
927 fn integration_prompt_sanitizes_nul_bytes() {
928 let prompt = build_agent_integration_prompt(
929 "RQ-0001",
930 "NUL test",
931 "main",
932 Path::new("/tmp/queue.json"),
933 Path::new("/tmp/done.json"),
934 1,
935 5,
936 "phase\0summary",
937 "status\0snapshot",
938 true,
939 "make ci",
940 Some("previous\0failure"),
941 );
942
943 assert!(!prompt.contains('\0'));
944 assert!(prompt.contains("phase summary"));
945 assert!(prompt.contains("status snapshot"));
946 assert!(prompt.contains("previous failure"));
947 }
948
949 #[test]
950 fn compliance_result_all_passed() {
951 let passed = ComplianceResult {
952 has_unresolved_conflicts: false,
953 queue_done_valid: true,
954 task_archived: true,
955 ci_passed: true,
956 conflict_files: vec![],
957 validation_error: None,
958 };
959 assert!(passed.all_passed());
960
961 let failed = ComplianceResult {
962 has_unresolved_conflicts: false,
963 queue_done_valid: true,
964 task_archived: false,
965 ci_passed: true,
966 conflict_files: vec![],
967 validation_error: None,
968 };
969 assert!(!failed.all_passed());
970 }
971
972 #[test]
973 fn integration_config_uses_explicit_target_branch() -> Result<()> {
974 let dir = tempfile::TempDir::new()?;
975 let resolved = crate::config::Resolved {
976 config: crate::contracts::Config::default(),
977 repo_root: dir.path().to_path_buf(),
978 queue_path: dir.path().join(".ralph/queue.json"),
979 done_path: dir.path().join(".ralph/done.json"),
980 id_prefix: "RQ".to_string(),
981 id_width: 4,
982 global_config_path: None,
983 project_config_path: None,
984 };
985
986 let cfg = IntegrationConfig::from_resolved(&resolved, "release/2026");
987 assert_eq!(cfg.target_branch, "release/2026");
988 Ok(())
989 }
990
991 #[test]
992 fn task_archived_validation_uses_resolved_paths_not_workspace_local_files() -> Result<()> {
993 let dir = TempDir::new()?;
994 let coordinator = dir.path().join("coordinator");
995 let worker_workspace = dir.path().join("worker-ws");
996 std::fs::create_dir_all(&coordinator)?;
997 std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
998
999 let coordinator_queue = coordinator.join("queue.json");
1000 let coordinator_done = coordinator.join("done.json");
1001 let workspace_queue = worker_workspace.join(".ralph/queue.json");
1002 let workspace_done = worker_workspace.join(".ralph/done.json");
1003
1004 let mut coordinator_queue_file = QueueFile::default();
1005 coordinator_queue_file
1006 .tasks
1007 .push(make_task("RQ-0001", TaskStatus::Todo));
1008 queue::save_queue(&coordinator_queue, &coordinator_queue_file)?;
1009 queue::save_queue(&coordinator_done, &QueueFile::default())?;
1010
1011 queue::save_queue(&workspace_queue, &QueueFile::default())?;
1013 let mut workspace_done_file = QueueFile::default();
1014 workspace_done_file
1015 .tasks
1016 .push(make_task("RQ-0001", TaskStatus::Done));
1017 queue::save_queue(&workspace_done, &workspace_done_file)?;
1018
1019 let resolved = crate::config::Resolved {
1020 config: crate::contracts::Config::default(),
1021 repo_root: worker_workspace,
1022 queue_path: coordinator_queue.clone(),
1023 done_path: coordinator_done,
1024 id_prefix: "RQ".to_string(),
1025 id_width: 4,
1026 global_config_path: None,
1027 project_config_path: None,
1028 };
1029
1030 let err = validate_task_archived(&resolved, "RQ-0001")
1031 .expect_err("validation should use resolved queue path");
1032 let msg = err.to_string();
1033 assert!(
1034 msg.contains(coordinator_queue.to_string_lossy().as_ref()),
1035 "error should reference resolved queue path, got: {msg}"
1036 );
1037 Ok(())
1038 }
1039
1040 #[test]
1041 fn queue_done_semantics_validation_uses_resolved_paths() -> Result<()> {
1042 let dir = TempDir::new()?;
1043 let coordinator = dir.path().join("coordinator");
1044 let worker_workspace = dir.path().join("worker-ws");
1045 std::fs::create_dir_all(&coordinator)?;
1046 std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
1047
1048 let coordinator_queue = coordinator.join("queue.json");
1049 let coordinator_done = coordinator.join("done.json");
1050 let workspace_queue = worker_workspace.join(".ralph/queue.json");
1051 let workspace_done = worker_workspace.join(".ralph/done.json");
1052
1053 let mut invalid_queue = QueueFile::default();
1055 invalid_queue
1056 .tasks
1057 .push(make_task("BAD-ID", TaskStatus::Todo));
1058 queue::save_queue(&coordinator_queue, &invalid_queue)?;
1059 queue::save_queue(&coordinator_done, &QueueFile::default())?;
1060
1061 let mut valid_queue = QueueFile::default();
1063 valid_queue
1064 .tasks
1065 .push(make_task("RQ-0001", TaskStatus::Todo));
1066 queue::save_queue(&workspace_queue, &valid_queue)?;
1067 queue::save_queue(&workspace_done, &QueueFile::default())?;
1068
1069 let resolved = crate::config::Resolved {
1070 config: crate::contracts::Config::default(),
1071 repo_root: worker_workspace.clone(),
1072 queue_path: coordinator_queue,
1073 done_path: coordinator_done,
1074 id_prefix: "RQ".to_string(),
1075 id_width: 4,
1076 global_config_path: None,
1077 project_config_path: None,
1078 };
1079
1080 validate_queue_done_semantics(&worker_workspace, &resolved)
1081 .expect_err("validation should fail from resolved queue path");
1082 Ok(())
1083 }
1084
1085 #[test]
1086 fn blocked_marker_roundtrip() -> Result<()> {
1087 let temp = TempDir::new()?;
1088 write_blocked_push_marker(temp.path(), "RQ-0001", "blocked reason", 5, 5)?;
1089 let marker = read_blocked_push_marker(temp.path())?.expect("marker should exist");
1090 assert_eq!(marker.task_id, "RQ-0001");
1091 assert_eq!(marker.reason, "blocked reason");
1092 assert_eq!(marker.attempt, 5);
1093 assert_eq!(marker.max_attempts, 5);
1094
1095 clear_blocked_push_marker(temp.path());
1096 assert!(read_blocked_push_marker(temp.path())?.is_none());
1097 Ok(())
1098 }
1099}