Skip to main content

ralph/commands/run/parallel/
integration.rs

1//! Worker integration loop for direct-push parallel mode.
2//!
3//! Responsibilities:
4//! - Drive a bounded integration loop through the phase continue-session.
5//! - Keep `fetch/rebase/conflict-fix/commit/push` execution agent-owned.
6//! - Enforce deterministic post-turn compliance gates before success.
7//! - Emit remediation handoff packets for blocked attempts.
8//!
9//! Not handled here:
10//! - Phase execution itself (see `run_one` phase modules).
11//! - Worker spawning/orchestration (see `worker.rs` and `orchestration.rs`).
12//!
13//! Invariants/assumptions:
14//! - Called after the worker has completed its configured phases.
15//! - Called only in parallel-worker mode.
16//! - Single-mode (`ralph run one` without `--parallel-worker`) is unchanged.
17
18#![allow(dead_code)]
19
20use crate::commands::run::supervision::{ContinueSession, resume_continue_session};
21use crate::config::Resolved;
22use crate::contracts::TaskStatus;
23use crate::git;
24use crate::queue;
25use crate::timeutil;
26use anyhow::{Context, Result, bail};
27use serde::{Deserialize, Serialize};
28use std::path::{Path, PathBuf};
29use std::process::Command;
30use std::time::Duration;
31
32// =============================================================================
33// Integration Loop Configuration
34// =============================================================================
35
36/// Configuration for the integration loop.
37#[derive(Debug, Clone)]
38pub struct IntegrationConfig {
39    /// Maximum number of integration attempts.
40    pub max_attempts: u32,
41    /// Backoff intervals between retries (in milliseconds).
42    pub backoff_ms: Vec<u64>,
43    /// Target branch to push to.
44    pub target_branch: String,
45    /// CI gate command (if enabled).
46    pub ci_command: Option<String>,
47    /// Whether CI gate is enabled.
48    pub ci_enabled: bool,
49}
50
51impl IntegrationConfig {
52    pub fn from_resolved(resolved: &Resolved, target_branch: &str) -> Self {
53        let parallel = &resolved.config.parallel;
54        let target_branch = target_branch.trim();
55        Self {
56            max_attempts: parallel.max_push_attempts.unwrap_or(50) as u32,
57            backoff_ms: parallel
58                .push_backoff_ms
59                .clone()
60                .unwrap_or_else(super::default_push_backoff_ms),
61            target_branch: if target_branch.is_empty() {
62                "main".to_string()
63            } else {
64                target_branch.to_string()
65            },
66            ci_command: resolved.config.agent.ci_gate_command.clone(),
67            ci_enabled: resolved.config.agent.ci_gate_enabled.unwrap_or(true),
68        }
69    }
70
71    /// Get backoff for a specific attempt index (0-indexed).
72    pub fn backoff_for_attempt(&self, attempt: usize) -> Duration {
73        let ms = self
74            .backoff_ms
75            .get(attempt)
76            .copied()
77            .unwrap_or_else(|| self.backoff_ms.last().copied().unwrap_or(10_000));
78        Duration::from_millis(ms)
79    }
80}
81
82// =============================================================================
83// Integration Outcome
84// =============================================================================
85
86/// Outcome of the integration loop.
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub enum IntegrationOutcome {
89    /// Push succeeded and compliance gates passed.
90    Success,
91    /// Integration could not complete within bounded retries.
92    BlockedPush { reason: String },
93    /// Terminal integration failure (for example no resumable session).
94    Failed { reason: String },
95}
96
97/// Persisted marker written by workers when integration ends in `blocked_push`.
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub(crate) struct BlockedPushMarker {
100    pub task_id: String,
101    pub reason: String,
102    pub attempt: u32,
103    pub max_attempts: u32,
104    pub generated_at: String,
105}
106
107// =============================================================================
108// Handoff Packet
109// =============================================================================
110
111/// Structured handoff packet for blocked remediation attempts.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct RemediationHandoff {
114    /// Task identifier.
115    pub task_id: String,
116    /// Task title.
117    pub task_title: String,
118    /// Target branch for the push.
119    pub target_branch: String,
120    /// Current attempt number.
121    pub attempt: u32,
122    /// Maximum attempts allowed.
123    pub max_attempts: u32,
124    /// List of files with unresolved conflicts.
125    pub conflict_files: Vec<String>,
126    /// Current git status output.
127    pub git_status: String,
128    /// Phase outputs summary (last phase response summary).
129    pub phase_summary: String,
130    /// Original task intent snapshot.
131    pub task_intent: String,
132    /// CI context when validation failed on CI.
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub ci_context: Option<CiContext>,
135    /// Timestamp when handoff was generated.
136    pub generated_at: String,
137    /// Queue/done semantic rules for conflict resolution.
138    pub queue_done_rules: QueueDoneRules,
139}
140
141/// CI context for remediation handoff.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct CiContext {
144    pub command: String,
145    pub last_output: String,
146    pub exit_code: i32,
147}
148
149/// Semantic rules for queue/done conflict resolution.
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct QueueDoneRules {
152    pub rules: Vec<String>,
153}
154
155impl Default for QueueDoneRules {
156    fn default() -> Self {
157        Self {
158            rules: vec![
159                "Remove the completed task from the queue file".into(),
160                "Ensure the completed task is present in the done archive file".into(),
161                "Preserve entries from other workers exactly".into(),
162                "Preserve task metadata (timestamps/notes)".into(),
163            ],
164        }
165    }
166}
167
168impl RemediationHandoff {
169    pub fn new(
170        task_id: impl Into<String>,
171        task_title: impl Into<String>,
172        target_branch: impl Into<String>,
173        attempt: u32,
174        max_attempts: u32,
175    ) -> Self {
176        Self {
177            task_id: task_id.into(),
178            task_title: task_title.into(),
179            target_branch: target_branch.into(),
180            attempt,
181            max_attempts,
182            conflict_files: Vec::new(),
183            git_status: String::new(),
184            phase_summary: String::new(),
185            task_intent: String::new(),
186            ci_context: None,
187            generated_at: timeutil::now_utc_rfc3339_or_fallback(),
188            queue_done_rules: QueueDoneRules::default(),
189        }
190    }
191
192    pub fn with_conflicts(mut self, files: Vec<String>) -> Self {
193        self.conflict_files = files;
194        self
195    }
196
197    pub fn with_git_status(mut self, status: String) -> Self {
198        self.git_status = status;
199        self
200    }
201
202    pub fn with_phase_summary(mut self, summary: String) -> Self {
203        self.phase_summary = summary;
204        self
205    }
206
207    pub fn with_task_intent(mut self, intent: String) -> Self {
208        self.task_intent = intent;
209        self
210    }
211
212    pub fn with_ci_context(mut self, command: String, last_output: String, exit_code: i32) -> Self {
213        self.ci_context = Some(CiContext {
214            command,
215            last_output,
216            exit_code,
217        });
218        self
219    }
220}
221
222fn blocked_push_marker_path(workspace_path: &Path) -> PathBuf {
223    workspace_path.join(super::BLOCKED_PUSH_MARKER_FILE)
224}
225
226fn write_blocked_push_marker(
227    workspace_path: &Path,
228    task_id: &str,
229    reason: &str,
230    attempt: u32,
231    max_attempts: u32,
232) -> Result<()> {
233    let marker = BlockedPushMarker {
234        task_id: task_id.trim().to_string(),
235        reason: reason.to_string(),
236        attempt,
237        max_attempts,
238        generated_at: timeutil::now_utc_rfc3339_or_fallback(),
239    };
240    let marker_path = blocked_push_marker_path(workspace_path);
241    if let Some(parent) = marker_path.parent() {
242        std::fs::create_dir_all(parent)
243            .with_context(|| format!("create blocked marker directory {}", parent.display()))?;
244    }
245    let rendered = serde_json::to_string_pretty(&marker).context("serialize blocked marker")?;
246    crate::fsutil::write_atomic(&marker_path, rendered.as_bytes())
247        .with_context(|| format!("write blocked marker {}", marker_path.display()))?;
248    Ok(())
249}
250
251fn clear_blocked_push_marker(workspace_path: &Path) {
252    let marker_path = blocked_push_marker_path(workspace_path);
253    if !marker_path.exists() {
254        return;
255    }
256    if let Err(err) = std::fs::remove_file(&marker_path) {
257        log::warn!(
258            "Failed to clear blocked marker at {}: {}",
259            marker_path.display(),
260            err
261        );
262    }
263}
264
265pub(crate) fn read_blocked_push_marker(workspace_path: &Path) -> Result<Option<BlockedPushMarker>> {
266    let marker_path = blocked_push_marker_path(workspace_path);
267    if !marker_path.exists() {
268        return Ok(None);
269    }
270    let raw = std::fs::read_to_string(&marker_path)
271        .with_context(|| format!("read blocked marker {}", marker_path.display()))?;
272    let marker =
273        serde_json::from_str::<BlockedPushMarker>(&raw).context("parse blocked marker json")?;
274    Ok(Some(marker))
275}
276
277/// Write handoff packet to workspace cache directory.
278pub fn write_handoff_packet(
279    workspace_path: &Path,
280    task_id: &str,
281    attempt: u32,
282    handoff: &RemediationHandoff,
283) -> Result<PathBuf> {
284    let handoff_dir = workspace_path
285        .join(".ralph/cache/parallel/handoffs")
286        .join(task_id);
287    std::fs::create_dir_all(&handoff_dir)
288        .with_context(|| format!("create handoff directory {}", handoff_dir.display()))?;
289
290    let path = handoff_dir.join(format!("attempt_{}.json", attempt));
291    let content = serde_json::to_string_pretty(handoff).context("serialize handoff packet")?;
292    crate::fsutil::write_atomic(&path, content.as_bytes())
293        .with_context(|| format!("write handoff packet to {}", path.display()))?;
294    Ok(path)
295}
296
297// =============================================================================
298// Compliance Checks
299// =============================================================================
300
301/// Result of deterministic compliance checks.
302#[derive(Debug, Clone)]
303pub struct ComplianceResult {
304    pub has_unresolved_conflicts: bool,
305    pub queue_done_valid: bool,
306    pub task_archived: bool,
307    pub ci_passed: bool,
308    pub conflict_files: Vec<String>,
309    pub validation_error: Option<String>,
310}
311
312impl ComplianceResult {
313    pub fn all_passed(&self) -> bool {
314        !self.has_unresolved_conflicts
315            && self.queue_done_valid
316            && self.task_archived
317            && self.ci_passed
318    }
319}
320
321/// Run deterministic compliance checks after each agent integration turn.
322pub fn run_compliance_checks(
323    repo_root: &Path,
324    resolved: &Resolved,
325    task_id: &str,
326    ci_enabled: bool,
327) -> Result<ComplianceResult> {
328    let conflict_files = git::list_conflict_files(repo_root)?;
329    let has_unresolved_conflicts = !conflict_files.is_empty();
330
331    let mut errors = Vec::new();
332    if has_unresolved_conflicts {
333        errors.push("unresolved merge conflicts remain".to_string());
334    }
335
336    let queue_done_valid = match validate_queue_done_semantics(repo_root, resolved) {
337        Ok(()) => true,
338        Err(err) => {
339            errors.push(format!("queue/done semantic validation failed: {}", err));
340            false
341        }
342    };
343
344    let task_archived = match validate_task_archived(resolved, task_id) {
345        Ok(()) => true,
346        Err(err) => {
347            errors.push(format!("task archival validation failed: {}", err));
348            false
349        }
350    };
351
352    let ci_passed = if ci_enabled {
353        match run_ci_check(repo_root, resolved) {
354            Ok(()) => true,
355            Err(err) => {
356                errors.push(format!("CI gate failed: {}", err));
357                false
358            }
359        }
360    } else {
361        true
362    };
363
364    Ok(ComplianceResult {
365        has_unresolved_conflicts,
366        queue_done_valid,
367        task_archived,
368        ci_passed,
369        conflict_files,
370        validation_error: if errors.is_empty() {
371            None
372        } else {
373            Some(errors.join("; "))
374        },
375    })
376}
377
378/// Validate queue/done files semantically from the resolved queue/done paths.
379fn validate_queue_done_semantics(_repo_root: &Path, resolved: &Resolved) -> Result<()> {
380    let queue_path = resolved.queue_path.clone();
381    let done_path = resolved.done_path.clone();
382
383    let queue = queue::load_queue(&queue_path).context("load queue for validation")?;
384    let max_depth = resolved.config.queue.max_dependency_depth.unwrap_or(10);
385    let done = if done_path.exists() {
386        Some(queue::load_queue(&done_path).context("load done for validation")?)
387    } else {
388        None
389    };
390
391    queue::validate_queue_set(
392        &queue,
393        done.as_ref(),
394        &resolved.id_prefix,
395        resolved.id_width,
396        max_depth,
397    )
398    .context("queue/done semantic validation")?;
399
400    Ok(())
401}
402
403/// Validate that the specific task is removed from queue and present as done.
404fn validate_task_archived(resolved: &Resolved, task_id: &str) -> Result<()> {
405    let task_id = task_id.trim();
406    if task_id.is_empty() {
407        bail!("task id is empty");
408    }
409
410    let queue_path = resolved.queue_path.clone();
411    let done_path = resolved.done_path.clone();
412
413    if !queue_path.exists() {
414        bail!("queue file missing at {}", queue_path.display());
415    }
416    if !done_path.exists() {
417        bail!("done file missing at {}", done_path.display());
418    }
419
420    let queue_file = queue::load_queue(&queue_path)
421        .with_context(|| format!("load queue file {}", queue_path.display()))?;
422    if queue_file
423        .tasks
424        .iter()
425        .any(|task| task.id.trim() == task_id)
426    {
427        bail!("task {} still present in {}", task_id, queue_path.display());
428    }
429
430    let done_file = queue::load_queue(&done_path)
431        .with_context(|| format!("load done file {}", done_path.display()))?;
432    let done_task = done_file
433        .tasks
434        .iter()
435        .find(|task| task.id.trim() == task_id)
436        .ok_or_else(|| anyhow::anyhow!("task {} missing from {}", task_id, done_path.display()))?;
437
438    if done_task.status != TaskStatus::Done {
439        bail!(
440            "task {} exists in done but status is {:?}, expected done",
441            task_id,
442            done_task.status
443        );
444    }
445
446    Ok(())
447}
448
449/// Run CI gate check as deterministic validation.
450fn run_ci_check(repo_root: &Path, resolved: &Resolved) -> Result<()> {
451    let ci_command = resolved
452        .config
453        .agent
454        .ci_gate_command
455        .as_deref()
456        .unwrap_or("make ci");
457
458    log::info!(
459        "Running CI gate validation (may take several minutes): {}",
460        ci_command
461    );
462    let started = std::time::Instant::now();
463
464    let output = Command::new("sh")
465        .arg("-c")
466        .arg(ci_command)
467        .current_dir(repo_root)
468        .output()
469        .context("spawn CI gate command")?;
470
471    log::info!(
472        "CI gate validation finished in {:.1}s with exit code {:?}",
473        started.elapsed().as_secs_f64(),
474        output.status.code()
475    );
476
477    if !output.status.success() {
478        let stderr = String::from_utf8_lossy(&output.stderr);
479        let stdout = String::from_utf8_lossy(&output.stdout);
480        let combined = format!("{}\n{}", stdout, stderr).to_lowercase();
481        if combined.contains("waiting for file lock")
482            || combined.contains("file lock on build directory")
483        {
484            bail!(
485                "CI lock contention detected (stale build/test process likely holding a lock). {} | {}",
486                stdout.trim(),
487                stderr.trim()
488            );
489        }
490        bail!("{} | {}", stdout.trim(), stderr.trim());
491    }
492
493    Ok(())
494}
495
496/// Verify that local HEAD is integrated into `origin/<target_branch>`.
497fn head_is_synced_to_remote(repo_root: &Path, target_branch: &str) -> Result<bool> {
498    git::fetch_branch(repo_root, "origin", target_branch)
499        .with_context(|| format!("fetch origin/{} for sync check", target_branch))?;
500
501    let remote_ref = format!("origin/{}", target_branch);
502    let output = Command::new("git")
503        .args(["merge-base", "--is-ancestor", "HEAD", &remote_ref])
504        .current_dir(repo_root)
505        .output()
506        .with_context(|| format!("check if HEAD is ancestor of {}", remote_ref))?;
507
508    if output.status.success() {
509        return Ok(true);
510    }
511    if output.status.code() == Some(1) {
512        return Ok(false);
513    }
514
515    let stderr = String::from_utf8_lossy(&output.stderr);
516    bail!(
517        "unable to verify push sync against {}: {}",
518        remote_ref,
519        stderr.trim()
520    );
521}
522
523// =============================================================================
524// Integration Prompting
525// =============================================================================
526
527#[allow(clippy::too_many_arguments)]
528fn build_agent_integration_prompt(
529    task_id: &str,
530    task_title: &str,
531    target_branch: &str,
532    queue_path: &Path,
533    done_path: &Path,
534    attempt: u32,
535    max_attempts: u32,
536    phase_summary: &str,
537    status_snapshot: &str,
538    ci_enabled: bool,
539    ci_command: Option<&str>,
540    previous_failure: Option<&str>,
541) -> String {
542    let queue_path_display = queue_path.display();
543    let done_path_display = done_path.display();
544    let failure_block = previous_failure.map_or_else(String::new, |failure| {
545        format!("\n## Previous Attempt Failed\n{}\n", failure)
546    });
547
548    let ci_block = if ci_enabled {
549        format!(
550            "- Run CI gate and fix failures before pushing: `{}`",
551            ci_command.unwrap_or("make ci")
552        )
553    } else {
554        "- CI gate is disabled for this task".to_string()
555    };
556
557    sanitize_prompt_for_runner(&format!(
558        r#"# Parallel Integration (Mandatory) - Attempt {attempt}/{max_attempts}
559You are finalizing task `{task_id}` (`{task_title}`) for direct push to `origin/{target_branch}`.
560
561## Hard Requirement
562You MUST execute integration git operations yourself in this turn. Do not stop early.
563You are NOT done until all required checks are satisfied.
564
565## Context
566- Phase summary: {phase_summary}
567- Current git status snapshot:
568```text
569{status_snapshot}
570```
571{failure_block}
572## Required Sequence
5731. `git fetch origin {target_branch}`
5742. Rebase on latest remote state: `git rebase origin/{target_branch}`
5753. If conflicts exist:
576   - Resolve every conflict marker while preserving both upstream and task intent.
577   - For queue/done files, preserve other workers' entries exactly.
578   - Ensure `{task_id}` is removed from queue and present as done in done.
579   - Continue rebase until complete (`git add ...`, `git rebase --continue`).
5804. Ensure bookkeeping is correct:
581   - `{queue_path_display}` does NOT contain `{task_id}`
582   - `{done_path_display}` DOES contain `{task_id}` with done status
5835. Stage and commit any remaining changes needed for integration.
5846. {ci_block}
5857. Push directly to base branch: `git push origin HEAD:{target_branch}`
5868. If push is rejected (non-fast-forward), repeat from step 1 in this same turn.
587
588## Completion Contract (Mandatory)
589Before ending your response:
590- No unresolved merge conflicts remain.
591- Push to `origin/{target_branch}` has succeeded.
592- Bookkeeping files are semantically correct for `{task_id}`.
593- CI has passed when enabled.
594
595If any check fails, keep working in this same turn until fixed.
596"#
597    ))
598}
599
600fn sanitize_prompt_for_runner(prompt: &str) -> String {
601    prompt
602        .chars()
603        .map(|c| {
604            if c.is_control() && c != '\n' && c != '\r' && c != '\t' {
605                ' '
606            } else {
607                c
608            }
609        })
610        .collect()
611}
612
613fn compose_block_reason(
614    compliance: &ComplianceResult,
615    pushed: bool,
616    extra: Option<&str>,
617) -> String {
618    let mut reasons = Vec::new();
619
620    if compliance.has_unresolved_conflicts {
621        reasons.push(format!(
622            "unresolved conflicts: {}",
623            compliance.conflict_files.join(", ")
624        ));
625    }
626    if !compliance.queue_done_valid {
627        reasons.push("queue/done semantic validation failed".to_string());
628    }
629    if !compliance.task_archived {
630        reasons.push("task archival validation failed".to_string());
631    }
632    if !compliance.ci_passed {
633        reasons.push("CI validation failed".to_string());
634    }
635    if !pushed {
636        reasons.push("HEAD is not yet integrated into target branch".to_string());
637    }
638    if let Some(extra) = extra {
639        reasons.push(extra.to_string());
640    }
641
642    if let Some(validation_error) = &compliance.validation_error {
643        reasons.push(validation_error.clone());
644    }
645
646    if reasons.is_empty() {
647        "integration did not satisfy completion contract".to_string()
648    } else {
649        reasons.join("; ")
650    }
651}
652
653// =============================================================================
654// Integration Loop
655// =============================================================================
656
657/// Run the integration loop for a completed worker.
658///
659/// Integration actions are agent-owned via continue-session prompts.
660/// Ralph only validates completion and retries when contract checks fail.
661#[allow(clippy::too_many_arguments)]
662pub(crate) fn run_integration_loop(
663    resolved: &Resolved,
664    task_id: &str,
665    task_title: &str,
666    config: &IntegrationConfig,
667    phase_summary: &str,
668    continue_session: &mut ContinueSession,
669    on_resume: &mut dyn FnMut(&crate::runner::RunnerOutput, Duration) -> Result<()>,
670    plugins: Option<&crate::plugins::registry::PluginRegistry>,
671) -> Result<IntegrationOutcome> {
672    let repo_root = &resolved.repo_root;
673    clear_blocked_push_marker(repo_root);
674    let mut previous_failure: Option<String> = None;
675
676    for attempt_index in 0..config.max_attempts {
677        let attempt = attempt_index + 1;
678        log::info!(
679            "Agent-owned integration attempt {}/{} for {}",
680            attempt,
681            config.max_attempts,
682            task_id
683        );
684
685        let status_snapshot = git::status_porcelain(repo_root).unwrap_or_default();
686        let prompt = build_agent_integration_prompt(
687            task_id,
688            task_title,
689            &config.target_branch,
690            &resolved.queue_path,
691            &resolved.done_path,
692            attempt,
693            config.max_attempts,
694            phase_summary,
695            &status_snapshot,
696            config.ci_enabled,
697            config.ci_command.as_deref(),
698            previous_failure.as_deref(),
699        );
700
701        let (output, elapsed) =
702            match resume_continue_session(resolved, continue_session, &prompt, plugins) {
703                Ok(resume) => resume,
704                Err(err) => {
705                    let reason = format!("integration continuation failed: {:#}", err);
706                    if attempt >= config.max_attempts {
707                        if let Err(marker_err) = write_blocked_push_marker(
708                            repo_root,
709                            task_id,
710                            &reason,
711                            attempt,
712                            config.max_attempts,
713                        ) {
714                            log::warn!("Failed to write blocked marker: {}", marker_err);
715                        }
716                        return Ok(IntegrationOutcome::BlockedPush { reason });
717                    }
718                    previous_failure = Some(reason);
719                    std::thread::sleep(config.backoff_for_attempt(attempt_index as usize));
720                    continue;
721                }
722            };
723
724        on_resume(&output, elapsed)?;
725
726        let compliance = run_compliance_checks(repo_root, resolved, task_id, config.ci_enabled)?;
727        let (pushed, push_check_error) =
728            match head_is_synced_to_remote(repo_root, &config.target_branch) {
729                Ok(value) => (value, None),
730                Err(err) => (false, Some(format!("push sync validation failed: {}", err))),
731            };
732
733        if compliance.all_passed() && pushed {
734            log::info!(
735                "Integration succeeded for {} on attempt {}/{}",
736                task_id,
737                attempt,
738                config.max_attempts
739            );
740            return Ok(IntegrationOutcome::Success);
741        }
742
743        let reason = compose_block_reason(&compliance, pushed, push_check_error.as_deref());
744        let mut handoff = RemediationHandoff::new(
745            task_id,
746            task_title,
747            &config.target_branch,
748            attempt,
749            config.max_attempts,
750        )
751        .with_conflicts(compliance.conflict_files.clone())
752        .with_git_status(git::status_porcelain(repo_root).unwrap_or_default())
753        .with_phase_summary(phase_summary.to_string())
754        .with_task_intent(format!("Complete task {}: {}", task_id, task_title));
755
756        if !compliance.ci_passed {
757            handoff = handoff.with_ci_context(
758                config
759                    .ci_command
760                    .clone()
761                    .unwrap_or_else(|| "make ci".into()),
762                compliance
763                    .validation_error
764                    .clone()
765                    .unwrap_or_else(|| "CI gate validation failed".to_string()),
766                1,
767            );
768        }
769
770        if let Err(err) = write_handoff_packet(repo_root, task_id, attempt, &handoff) {
771            log::warn!("Failed to persist remediation handoff packet: {}", err);
772        }
773
774        if attempt >= config.max_attempts {
775            if let Err(marker_err) =
776                write_blocked_push_marker(repo_root, task_id, &reason, attempt, config.max_attempts)
777            {
778                log::warn!("Failed to write blocked marker: {}", marker_err);
779            }
780            return Ok(IntegrationOutcome::BlockedPush { reason });
781        }
782
783        previous_failure = Some(reason);
784        std::thread::sleep(config.backoff_for_attempt(attempt_index as usize));
785    }
786
787    let reason = format!("integration exhausted {} attempts", config.max_attempts);
788    if let Err(marker_err) = write_blocked_push_marker(
789        repo_root,
790        task_id,
791        &reason,
792        config.max_attempts,
793        config.max_attempts,
794    ) {
795        log::warn!("Failed to write blocked marker: {}", marker_err);
796    }
797    Ok(IntegrationOutcome::BlockedPush { reason })
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use crate::contracts::{QueueFile, Task, TaskPriority, TaskStatus};
804    use std::collections::HashMap;
805    use tempfile::TempDir;
806
807    fn make_task(id: &str, status: TaskStatus) -> Task {
808        Task {
809            id: id.to_string(),
810            title: format!("Task {}", id),
811            description: None,
812            status,
813            priority: TaskPriority::Medium,
814            tags: vec![],
815            scope: vec![],
816            evidence: vec![],
817            plan: vec![],
818            notes: vec![],
819            request: None,
820            agent: None,
821            created_at: Some("2026-01-01T00:00:00Z".to_string()),
822            updated_at: Some("2026-01-01T00:00:00Z".to_string()),
823            completed_at: None,
824            started_at: None,
825            scheduled_start: None,
826            depends_on: vec![],
827            blocks: vec![],
828            relates_to: vec![],
829            duplicates: None,
830            custom_fields: HashMap::new(),
831            estimated_minutes: None,
832            actual_minutes: None,
833            parent_id: None,
834        }
835    }
836
837    #[test]
838    fn integration_config_default_backoff() {
839        let config = IntegrationConfig {
840            max_attempts: 5,
841            backoff_ms: vec![500, 2000, 5000, 10000],
842            target_branch: "main".into(),
843            ci_command: Some("make ci".into()),
844            ci_enabled: true,
845        };
846
847        assert_eq!(config.backoff_for_attempt(0), Duration::from_millis(500));
848        assert_eq!(config.backoff_for_attempt(1), Duration::from_millis(2000));
849        assert_eq!(config.backoff_for_attempt(2), Duration::from_millis(5000));
850        assert_eq!(config.backoff_for_attempt(3), Duration::from_millis(10000));
851        assert_eq!(config.backoff_for_attempt(4), Duration::from_millis(10000));
852        assert_eq!(config.backoff_for_attempt(10), Duration::from_millis(10000));
853    }
854
855    #[test]
856    fn remediation_handoff_builder() {
857        let handoff = RemediationHandoff::new("RQ-0001", "Test Task", "main", 2, 5)
858            .with_conflicts(vec!["src/lib.rs".into(), "src/main.rs".into()])
859            .with_git_status("UU src/lib.rs\nUU src/main.rs".into())
860            .with_phase_summary("Implemented feature X".into())
861            .with_task_intent("Complete feature X implementation".into());
862
863        assert_eq!(handoff.task_id, "RQ-0001");
864        assert_eq!(handoff.task_title, "Test Task");
865        assert_eq!(handoff.target_branch, "main");
866        assert_eq!(handoff.attempt, 2);
867        assert_eq!(handoff.max_attempts, 5);
868        assert_eq!(handoff.conflict_files.len(), 2);
869        assert_eq!(handoff.phase_summary, "Implemented feature X");
870        assert!(handoff.ci_context.is_none());
871    }
872
873    #[test]
874    fn remediation_handoff_with_ci() {
875        let handoff = RemediationHandoff::new("RQ-0001", "Test", "main", 1, 5).with_ci_context(
876            "make ci".into(),
877            "test failed".into(),
878            1,
879        );
880
881        assert!(handoff.ci_context.is_some());
882        let ci = handoff.ci_context.unwrap();
883        assert_eq!(ci.command, "make ci");
884        assert_eq!(ci.last_output, "test failed");
885        assert_eq!(ci.exit_code, 1);
886    }
887
888    #[test]
889    fn integration_prompt_contains_mandatory_contract() {
890        let prompt = build_agent_integration_prompt(
891            "RQ-0001",
892            "Implement feature",
893            "main",
894            Path::new("/tmp/queue.json"),
895            Path::new("/tmp/done.json"),
896            1,
897            5,
898            "phase summary",
899            " M src/lib.rs",
900            true,
901            Some("make ci"),
902            Some("previous failure"),
903        );
904
905        assert!(prompt.contains("MUST execute integration git operations"));
906        assert!(prompt.contains("Completion Contract (Mandatory)"));
907        assert!(prompt.contains("git push origin HEAD:main"));
908        assert!(prompt.contains("previous failure"));
909    }
910
911    #[test]
912    fn integration_prompt_uses_explicit_target_branch_for_push() {
913        let prompt = build_agent_integration_prompt(
914            "RQ-0001",
915            "Implement feature",
916            "release/2026",
917            Path::new("/tmp/queue.json"),
918            Path::new("/tmp/done.json"),
919            1,
920            5,
921            "phase summary",
922            " M src/lib.rs",
923            true,
924            Some("make ci"),
925            None,
926        );
927
928        assert!(prompt.contains("git fetch origin release/2026"));
929        assert!(prompt.contains("git rebase origin/release/2026"));
930        assert!(prompt.contains("git push origin HEAD:release/2026"));
931    }
932
933    #[test]
934    fn integration_prompt_sanitizes_nul_bytes() {
935        let prompt = build_agent_integration_prompt(
936            "RQ-0001",
937            "NUL test",
938            "main",
939            Path::new("/tmp/queue.json"),
940            Path::new("/tmp/done.json"),
941            1,
942            5,
943            "phase\0summary",
944            "status\0snapshot",
945            true,
946            Some("make ci"),
947            Some("previous\0failure"),
948        );
949
950        assert!(!prompt.contains('\0'));
951        assert!(prompt.contains("phase summary"));
952        assert!(prompt.contains("status snapshot"));
953        assert!(prompt.contains("previous failure"));
954    }
955
956    #[test]
957    fn compliance_result_all_passed() {
958        let passed = ComplianceResult {
959            has_unresolved_conflicts: false,
960            queue_done_valid: true,
961            task_archived: true,
962            ci_passed: true,
963            conflict_files: vec![],
964            validation_error: None,
965        };
966        assert!(passed.all_passed());
967
968        let failed = ComplianceResult {
969            has_unresolved_conflicts: false,
970            queue_done_valid: true,
971            task_archived: false,
972            ci_passed: true,
973            conflict_files: vec![],
974            validation_error: None,
975        };
976        assert!(!failed.all_passed());
977    }
978
979    #[test]
980    fn integration_config_uses_explicit_target_branch() -> Result<()> {
981        let dir = tempfile::TempDir::new()?;
982        let resolved = crate::config::Resolved {
983            config: crate::contracts::Config::default(),
984            repo_root: dir.path().to_path_buf(),
985            queue_path: dir.path().join(".ralph/queue.json"),
986            done_path: dir.path().join(".ralph/done.json"),
987            id_prefix: "RQ".to_string(),
988            id_width: 4,
989            global_config_path: None,
990            project_config_path: None,
991        };
992
993        let cfg = IntegrationConfig::from_resolved(&resolved, "release/2026");
994        assert_eq!(cfg.target_branch, "release/2026");
995        Ok(())
996    }
997
998    #[test]
999    fn task_archived_validation_uses_resolved_paths_not_workspace_local_files() -> Result<()> {
1000        let dir = TempDir::new()?;
1001        let coordinator = dir.path().join("coordinator");
1002        let worker_workspace = dir.path().join("worker-ws");
1003        std::fs::create_dir_all(&coordinator)?;
1004        std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
1005
1006        let coordinator_queue = coordinator.join("queue.json");
1007        let coordinator_done = coordinator.join("done.json");
1008        let workspace_queue = worker_workspace.join(".ralph/queue.json");
1009        let workspace_done = worker_workspace.join(".ralph/done.json");
1010
1011        let mut coordinator_queue_file = QueueFile::default();
1012        coordinator_queue_file
1013            .tasks
1014            .push(make_task("RQ-0001", TaskStatus::Todo));
1015        queue::save_queue(&coordinator_queue, &coordinator_queue_file)?;
1016        queue::save_queue(&coordinator_done, &QueueFile::default())?;
1017
1018        // Workspace-local files look archived, but should be ignored by validation.
1019        queue::save_queue(&workspace_queue, &QueueFile::default())?;
1020        let mut workspace_done_file = QueueFile::default();
1021        workspace_done_file
1022            .tasks
1023            .push(make_task("RQ-0001", TaskStatus::Done));
1024        queue::save_queue(&workspace_done, &workspace_done_file)?;
1025
1026        let resolved = crate::config::Resolved {
1027            config: crate::contracts::Config::default(),
1028            repo_root: worker_workspace,
1029            queue_path: coordinator_queue.clone(),
1030            done_path: coordinator_done,
1031            id_prefix: "RQ".to_string(),
1032            id_width: 4,
1033            global_config_path: None,
1034            project_config_path: None,
1035        };
1036
1037        let err = validate_task_archived(&resolved, "RQ-0001")
1038            .expect_err("validation should use resolved queue path");
1039        let msg = err.to_string();
1040        assert!(
1041            msg.contains(coordinator_queue.to_string_lossy().as_ref()),
1042            "error should reference resolved queue path, got: {msg}"
1043        );
1044        Ok(())
1045    }
1046
1047    #[test]
1048    fn queue_done_semantics_validation_uses_resolved_paths() -> Result<()> {
1049        let dir = TempDir::new()?;
1050        let coordinator = dir.path().join("coordinator");
1051        let worker_workspace = dir.path().join("worker-ws");
1052        std::fs::create_dir_all(&coordinator)?;
1053        std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
1054
1055        let coordinator_queue = coordinator.join("queue.json");
1056        let coordinator_done = coordinator.join("done.json");
1057        let workspace_queue = worker_workspace.join(".ralph/queue.json");
1058        let workspace_done = worker_workspace.join(".ralph/done.json");
1059
1060        // Coordinator queue is semantically invalid for RQ id rules.
1061        let mut invalid_queue = QueueFile::default();
1062        invalid_queue
1063            .tasks
1064            .push(make_task("BAD-ID", TaskStatus::Todo));
1065        queue::save_queue(&coordinator_queue, &invalid_queue)?;
1066        queue::save_queue(&coordinator_done, &QueueFile::default())?;
1067
1068        // Workspace-local queue is valid, but should not be read.
1069        let mut valid_queue = QueueFile::default();
1070        valid_queue
1071            .tasks
1072            .push(make_task("RQ-0001", TaskStatus::Todo));
1073        queue::save_queue(&workspace_queue, &valid_queue)?;
1074        queue::save_queue(&workspace_done, &QueueFile::default())?;
1075
1076        let resolved = crate::config::Resolved {
1077            config: crate::contracts::Config::default(),
1078            repo_root: worker_workspace.clone(),
1079            queue_path: coordinator_queue,
1080            done_path: coordinator_done,
1081            id_prefix: "RQ".to_string(),
1082            id_width: 4,
1083            global_config_path: None,
1084            project_config_path: None,
1085        };
1086
1087        validate_queue_done_semantics(&worker_workspace, &resolved)
1088            .expect_err("validation should fail from resolved queue path");
1089        Ok(())
1090    }
1091
1092    #[test]
1093    fn blocked_marker_roundtrip() -> Result<()> {
1094        let temp = TempDir::new()?;
1095        write_blocked_push_marker(temp.path(), "RQ-0001", "blocked reason", 5, 5)?;
1096        let marker = read_blocked_push_marker(temp.path())?.expect("marker should exist");
1097        assert_eq!(marker.task_id, "RQ-0001");
1098        assert_eq!(marker.reason, "blocked reason");
1099        assert_eq!(marker.attempt, 5);
1100        assert_eq!(marker.max_attempts, 5);
1101
1102        clear_blocked_push_marker(temp.path());
1103        assert!(read_blocked_push_marker(temp.path())?.is_none());
1104        Ok(())
1105    }
1106}