Skip to main content

ralph/commands/run/parallel/
integration.rs

1//! Worker integration loop for direct-push parallel mode.
2//!
3//! Responsibilities:
4//! - Drive a bounded integration loop through the phase continue-session.
5//! - Keep `fetch/rebase/conflict-fix/commit/push` execution agent-owned.
6//! - Enforce deterministic post-turn compliance gates before success.
7//! - Emit remediation handoff packets for blocked attempts.
8//!
9//! Not handled here:
10//! - Phase execution itself (see `run_one` phase modules).
11//! - Worker spawning/orchestration (see `worker.rs` and `orchestration.rs`).
12//!
13//! Invariants/assumptions:
14//! - Called after the worker has completed its configured phases.
15//! - Called only in parallel-worker mode.
16//! - Single-mode (`ralph run one` without `--parallel-worker`) is unchanged.
17
18#![allow(dead_code)]
19
20use crate::commands::run::supervision::{
21    ContinueSession, capture_ci_gate_result, resume_continue_session,
22};
23use crate::config::Resolved;
24use crate::contracts::TaskStatus;
25use crate::git;
26use crate::git::error::git_output;
27use crate::queue;
28use crate::runutil::sleep_with_cancellation;
29use crate::timeutil;
30use anyhow::{Context, Result, bail};
31use serde::{Deserialize, Serialize};
32use std::path::{Path, PathBuf};
33use std::time::Duration;
34
35// =============================================================================
36// Integration Loop Configuration
37// =============================================================================
38
39/// Configuration for the integration loop.
40#[derive(Debug, Clone)]
41pub struct IntegrationConfig {
42    /// Maximum number of integration attempts.
43    pub max_attempts: u32,
44    /// Backoff intervals between retries (in milliseconds).
45    pub backoff_ms: Vec<u64>,
46    /// Target branch to push to.
47    pub target_branch: String,
48    /// Whether CI gate is enabled.
49    pub ci_enabled: bool,
50    /// Rendered CI gate label for prompts and handoff context.
51    pub ci_label: String,
52}
53
54impl IntegrationConfig {
55    pub fn from_resolved(resolved: &Resolved, target_branch: &str) -> Self {
56        let parallel = &resolved.config.parallel;
57        let target_branch = target_branch.trim();
58        Self {
59            max_attempts: parallel.max_push_attempts.unwrap_or(50) as u32,
60            backoff_ms: parallel
61                .push_backoff_ms
62                .clone()
63                .unwrap_or_else(super::default_push_backoff_ms),
64            target_branch: if target_branch.is_empty() {
65                "main".to_string()
66            } else {
67                target_branch.to_string()
68            },
69            ci_enabled: resolved
70                .config
71                .agent
72                .ci_gate
73                .as_ref()
74                .is_some_and(|ci_gate| ci_gate.is_enabled()),
75            ci_label: crate::commands::run::supervision::ci_gate_command_label(resolved),
76        }
77    }
78
79    /// Get backoff for a specific attempt index (0-indexed).
80    pub fn backoff_for_attempt(&self, attempt: usize) -> Duration {
81        let ms = self
82            .backoff_ms
83            .get(attempt)
84            .copied()
85            .unwrap_or_else(|| self.backoff_ms.last().copied().unwrap_or(10_000));
86        Duration::from_millis(ms)
87    }
88}
89
90// =============================================================================
91// Integration Outcome
92// =============================================================================
93
94/// Outcome of the integration loop.
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum IntegrationOutcome {
97    /// Push succeeded and compliance gates passed.
98    Success,
99    /// Integration could not complete within bounded retries.
100    BlockedPush { reason: String },
101    /// Terminal integration failure (for example no resumable session).
102    Failed { reason: String },
103}
104
105/// Persisted marker written by workers when integration ends in `blocked_push`.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub(crate) struct BlockedPushMarker {
108    pub task_id: String,
109    pub reason: String,
110    pub attempt: u32,
111    pub max_attempts: u32,
112    pub generated_at: String,
113}
114
115// =============================================================================
116// Handoff Packet
117// =============================================================================
118
119/// Structured handoff packet for blocked remediation attempts.
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct RemediationHandoff {
122    /// Task identifier.
123    pub task_id: String,
124    /// Task title.
125    pub task_title: String,
126    /// Target branch for the push.
127    pub target_branch: String,
128    /// Current attempt number.
129    pub attempt: u32,
130    /// Maximum attempts allowed.
131    pub max_attempts: u32,
132    /// List of files with unresolved conflicts.
133    pub conflict_files: Vec<String>,
134    /// Current git status output.
135    pub git_status: String,
136    /// Phase outputs summary (last phase response summary).
137    pub phase_summary: String,
138    /// Original task intent snapshot.
139    pub task_intent: String,
140    /// CI context when validation failed on CI.
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub ci_context: Option<CiContext>,
143    /// Timestamp when handoff was generated.
144    pub generated_at: String,
145    /// Queue/done semantic rules for conflict resolution.
146    pub queue_done_rules: QueueDoneRules,
147}
148
149/// CI context for remediation handoff.
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct CiContext {
152    pub command: String,
153    pub last_output: String,
154    pub exit_code: i32,
155}
156
157/// Semantic rules for queue/done conflict resolution.
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct QueueDoneRules {
160    pub rules: Vec<String>,
161}
162
163impl Default for QueueDoneRules {
164    fn default() -> Self {
165        Self {
166            rules: vec![
167                "Remove the completed task from the queue file".into(),
168                "Ensure the completed task is present in the done archive file".into(),
169                "Preserve entries from other workers exactly".into(),
170                "Preserve task metadata (timestamps/notes)".into(),
171            ],
172        }
173    }
174}
175
176impl RemediationHandoff {
177    pub fn new(
178        task_id: impl Into<String>,
179        task_title: impl Into<String>,
180        target_branch: impl Into<String>,
181        attempt: u32,
182        max_attempts: u32,
183    ) -> Self {
184        Self {
185            task_id: task_id.into(),
186            task_title: task_title.into(),
187            target_branch: target_branch.into(),
188            attempt,
189            max_attempts,
190            conflict_files: Vec::new(),
191            git_status: String::new(),
192            phase_summary: String::new(),
193            task_intent: String::new(),
194            ci_context: None,
195            generated_at: timeutil::now_utc_rfc3339_or_fallback(),
196            queue_done_rules: QueueDoneRules::default(),
197        }
198    }
199
200    pub fn with_conflicts(mut self, files: Vec<String>) -> Self {
201        self.conflict_files = files;
202        self
203    }
204
205    pub fn with_git_status(mut self, status: String) -> Self {
206        self.git_status = status;
207        self
208    }
209
210    pub fn with_phase_summary(mut self, summary: String) -> Self {
211        self.phase_summary = summary;
212        self
213    }
214
215    pub fn with_task_intent(mut self, intent: String) -> Self {
216        self.task_intent = intent;
217        self
218    }
219
220    pub fn with_ci_context(mut self, command: String, last_output: String, exit_code: i32) -> Self {
221        self.ci_context = Some(CiContext {
222            command,
223            last_output,
224            exit_code,
225        });
226        self
227    }
228}
229
230fn blocked_push_marker_path(workspace_path: &Path) -> PathBuf {
231    workspace_path.join(super::BLOCKED_PUSH_MARKER_FILE)
232}
233
234fn write_blocked_push_marker(
235    workspace_path: &Path,
236    task_id: &str,
237    reason: &str,
238    attempt: u32,
239    max_attempts: u32,
240) -> Result<()> {
241    let marker = BlockedPushMarker {
242        task_id: task_id.trim().to_string(),
243        reason: reason.to_string(),
244        attempt,
245        max_attempts,
246        generated_at: timeutil::now_utc_rfc3339_or_fallback(),
247    };
248    let marker_path = blocked_push_marker_path(workspace_path);
249    if let Some(parent) = marker_path.parent() {
250        std::fs::create_dir_all(parent)
251            .with_context(|| format!("create blocked marker directory {}", parent.display()))?;
252    }
253    let rendered = serde_json::to_string_pretty(&marker).context("serialize blocked marker")?;
254    crate::fsutil::write_atomic(&marker_path, rendered.as_bytes())
255        .with_context(|| format!("write blocked marker {}", marker_path.display()))?;
256    Ok(())
257}
258
259fn clear_blocked_push_marker(workspace_path: &Path) {
260    let marker_path = blocked_push_marker_path(workspace_path);
261    if !marker_path.exists() {
262        return;
263    }
264    if let Err(err) = std::fs::remove_file(&marker_path) {
265        log::warn!(
266            "Failed to clear blocked marker at {}: {}",
267            marker_path.display(),
268            err
269        );
270    }
271}
272
273pub(crate) fn read_blocked_push_marker(workspace_path: &Path) -> Result<Option<BlockedPushMarker>> {
274    let marker_path = blocked_push_marker_path(workspace_path);
275    if !marker_path.exists() {
276        return Ok(None);
277    }
278    let raw = std::fs::read_to_string(&marker_path)
279        .with_context(|| format!("read blocked marker {}", marker_path.display()))?;
280    let marker =
281        serde_json::from_str::<BlockedPushMarker>(&raw).context("parse blocked marker json")?;
282    Ok(Some(marker))
283}
284
285/// Write handoff packet to workspace cache directory.
286pub fn write_handoff_packet(
287    workspace_path: &Path,
288    task_id: &str,
289    attempt: u32,
290    handoff: &RemediationHandoff,
291) -> Result<PathBuf> {
292    let handoff_dir = workspace_path
293        .join(".ralph/cache/parallel/handoffs")
294        .join(task_id);
295    std::fs::create_dir_all(&handoff_dir)
296        .with_context(|| format!("create handoff directory {}", handoff_dir.display()))?;
297
298    let path = handoff_dir.join(format!("attempt_{}.json", attempt));
299    let content = serde_json::to_string_pretty(handoff).context("serialize handoff packet")?;
300    crate::fsutil::write_atomic(&path, content.as_bytes())
301        .with_context(|| format!("write handoff packet to {}", path.display()))?;
302    Ok(path)
303}
304
305// =============================================================================
306// Compliance Checks
307// =============================================================================
308
309/// Result of deterministic compliance checks.
310#[derive(Debug, Clone)]
311pub struct ComplianceResult {
312    pub has_unresolved_conflicts: bool,
313    pub queue_done_valid: bool,
314    pub task_archived: bool,
315    pub ci_passed: bool,
316    pub conflict_files: Vec<String>,
317    pub validation_error: Option<String>,
318}
319
320impl ComplianceResult {
321    pub fn all_passed(&self) -> bool {
322        !self.has_unresolved_conflicts
323            && self.queue_done_valid
324            && self.task_archived
325            && self.ci_passed
326    }
327}
328
329/// Run deterministic compliance checks after each agent integration turn.
330pub fn run_compliance_checks(
331    repo_root: &Path,
332    resolved: &Resolved,
333    task_id: &str,
334    ci_enabled: bool,
335) -> Result<ComplianceResult> {
336    let conflict_files = git::list_conflict_files(repo_root)?;
337    let has_unresolved_conflicts = !conflict_files.is_empty();
338
339    let mut errors = Vec::new();
340    if has_unresolved_conflicts {
341        errors.push("unresolved merge conflicts remain".to_string());
342    }
343
344    let queue_done_valid = match validate_queue_done_semantics(repo_root, resolved) {
345        Ok(()) => true,
346        Err(err) => {
347            errors.push(format!("queue/done semantic validation failed: {}", err));
348            false
349        }
350    };
351
352    let task_archived = match validate_task_archived(resolved, task_id) {
353        Ok(()) => true,
354        Err(err) => {
355            errors.push(format!("task archival validation failed: {}", err));
356            false
357        }
358    };
359
360    let ci_passed = if ci_enabled {
361        match run_ci_check(repo_root, resolved) {
362            Ok(()) => true,
363            Err(err) => {
364                errors.push(format!("CI gate failed: {}", err));
365                false
366            }
367        }
368    } else {
369        true
370    };
371
372    Ok(ComplianceResult {
373        has_unresolved_conflicts,
374        queue_done_valid,
375        task_archived,
376        ci_passed,
377        conflict_files,
378        validation_error: if errors.is_empty() {
379            None
380        } else {
381            Some(errors.join("; "))
382        },
383    })
384}
385
386/// Validate queue/done files semantically from the resolved queue/done paths.
387fn validate_queue_done_semantics(_repo_root: &Path, resolved: &Resolved) -> Result<()> {
388    let queue_path = resolved.queue_path.clone();
389    let done_path = resolved.done_path.clone();
390
391    let queue = queue::load_queue(&queue_path).context("load queue for validation")?;
392    let max_depth = resolved.config.queue.max_dependency_depth.unwrap_or(10);
393    let done = if done_path.exists() {
394        Some(queue::load_queue(&done_path).context("load done for validation")?)
395    } else {
396        None
397    };
398
399    queue::validate_queue_set(
400        &queue,
401        done.as_ref(),
402        &resolved.id_prefix,
403        resolved.id_width,
404        max_depth,
405    )
406    .context("queue/done semantic validation")?;
407
408    Ok(())
409}
410
411/// Validate that the specific task is removed from queue and present as done.
412fn validate_task_archived(resolved: &Resolved, task_id: &str) -> Result<()> {
413    let task_id = task_id.trim();
414    if task_id.is_empty() {
415        bail!("task id is empty");
416    }
417
418    let queue_path = resolved.queue_path.clone();
419    let done_path = resolved.done_path.clone();
420
421    if !queue_path.exists() {
422        bail!("queue file missing at {}", queue_path.display());
423    }
424    if !done_path.exists() {
425        bail!("done file missing at {}", done_path.display());
426    }
427
428    let queue_file = queue::load_queue(&queue_path)
429        .with_context(|| format!("load queue file {}", queue_path.display()))?;
430    if queue_file
431        .tasks
432        .iter()
433        .any(|task| task.id.trim() == task_id)
434    {
435        bail!("task {} still present in {}", task_id, queue_path.display());
436    }
437
438    let done_file = queue::load_queue(&done_path)
439        .with_context(|| format!("load done file {}", done_path.display()))?;
440    let done_task = done_file
441        .tasks
442        .iter()
443        .find(|task| task.id.trim() == task_id)
444        .ok_or_else(|| anyhow::anyhow!("task {} missing from {}", task_id, done_path.display()))?;
445
446    if done_task.status != TaskStatus::Done {
447        bail!(
448            "task {} exists in done but status is {:?}, expected done",
449            task_id,
450            done_task.status
451        );
452    }
453
454    Ok(())
455}
456
457/// Run CI gate check as deterministic validation.
458fn run_ci_check(_repo_root: &Path, resolved: &Resolved) -> Result<()> {
459    let result = capture_ci_gate_result(resolved)?;
460    if !result.success {
461        let combined = format!("{}\n{}", result.stdout, result.stderr).to_lowercase();
462        if combined.contains("waiting for file lock")
463            || combined.contains("file lock on build directory")
464        {
465            bail!(
466                "CI lock contention detected (stale build/test process likely holding a lock). {} | {}",
467                result.stdout.trim(),
468                result.stderr.trim()
469            );
470        }
471        bail!("{} | {}", result.stdout.trim(), result.stderr.trim());
472    }
473
474    Ok(())
475}
476
477/// Verify that local HEAD is integrated into `origin/<target_branch>`.
478fn head_is_synced_to_remote(repo_root: &Path, target_branch: &str) -> Result<bool> {
479    git::fetch_branch(repo_root, "origin", target_branch)
480        .with_context(|| format!("fetch origin/{} for sync check", target_branch))?;
481
482    let remote_ref = format!("origin/{}", target_branch);
483    let output = git_output(
484        repo_root,
485        &["merge-base", "--is-ancestor", "HEAD", &remote_ref],
486    )
487    .with_context(|| format!("check if HEAD is ancestor of {}", remote_ref))?;
488
489    if output.status.success() {
490        return Ok(true);
491    }
492    if output.status.code() == Some(1) {
493        return Ok(false);
494    }
495
496    let stderr = String::from_utf8_lossy(&output.stderr);
497    bail!(
498        "unable to verify push sync against {}: {}",
499        remote_ref,
500        stderr.trim()
501    );
502}
503
504// =============================================================================
505// Integration Prompting
506// =============================================================================
507
508#[allow(clippy::too_many_arguments)]
509fn build_agent_integration_prompt(
510    task_id: &str,
511    task_title: &str,
512    target_branch: &str,
513    queue_path: &Path,
514    done_path: &Path,
515    attempt: u32,
516    max_attempts: u32,
517    phase_summary: &str,
518    status_snapshot: &str,
519    ci_enabled: bool,
520    ci_label: &str,
521    previous_failure: Option<&str>,
522) -> String {
523    let queue_path_display = queue_path.display();
524    let done_path_display = done_path.display();
525    let failure_block = previous_failure.map_or_else(String::new, |failure| {
526        format!("\n## Previous Attempt Failed\n{}\n", failure)
527    });
528
529    let ci_block = if ci_enabled {
530        format!(
531            "- Run CI gate and fix failures before pushing: `{}`",
532            ci_label
533        )
534    } else {
535        "- CI gate is disabled for this task".to_string()
536    };
537
538    sanitize_prompt_for_runner(&format!(
539        r#"# Parallel Integration (Mandatory) - Attempt {attempt}/{max_attempts}
540You are finalizing task `{task_id}` (`{task_title}`) for direct push to `origin/{target_branch}`.
541
542## Hard Requirement
543You MUST execute integration git operations yourself in this turn. Do not stop early.
544You are NOT done until all required checks are satisfied.
545
546## Context
547- Phase summary: {phase_summary}
548- Current git status snapshot:
549```text
550{status_snapshot}
551```
552{failure_block}
553## Required Sequence
5541. `git fetch origin {target_branch}`
5552. Rebase on latest remote state: `git rebase origin/{target_branch}`
5563. If conflicts exist:
557   - Resolve every conflict marker while preserving both upstream and task intent.
558   - For queue/done files, preserve other workers' entries exactly.
559   - Ensure `{task_id}` is removed from queue and present as done in done.
560   - Continue rebase until complete (`git add ...`, `git rebase --continue`).
5614. Ensure bookkeeping is correct:
562   - `{queue_path_display}` does NOT contain `{task_id}`
563   - `{done_path_display}` DOES contain `{task_id}` with done status
5645. Stage and commit any remaining changes needed for integration.
5656. {ci_block}
5667. Push directly to base branch: `git push origin HEAD:{target_branch}`
5678. If push is rejected (non-fast-forward), repeat from step 1 in this same turn.
568
569## Completion Contract (Mandatory)
570Before ending your response:
571- No unresolved merge conflicts remain.
572- Push to `origin/{target_branch}` has succeeded.
573- Bookkeeping files are semantically correct for `{task_id}`.
574- CI has passed when enabled.
575
576If any check fails, keep working in this same turn until fixed.
577"#
578    ))
579}
580
581fn sanitize_prompt_for_runner(prompt: &str) -> String {
582    prompt
583        .chars()
584        .map(|c| {
585            if c.is_control() && c != '\n' && c != '\r' && c != '\t' {
586                ' '
587            } else {
588                c
589            }
590        })
591        .collect()
592}
593
594fn compose_block_reason(
595    compliance: &ComplianceResult,
596    pushed: bool,
597    extra: Option<&str>,
598) -> String {
599    let mut reasons = Vec::new();
600
601    if compliance.has_unresolved_conflicts {
602        reasons.push(format!(
603            "unresolved conflicts: {}",
604            compliance.conflict_files.join(", ")
605        ));
606    }
607    if !compliance.queue_done_valid {
608        reasons.push("queue/done semantic validation failed".to_string());
609    }
610    if !compliance.task_archived {
611        reasons.push("task archival validation failed".to_string());
612    }
613    if !compliance.ci_passed {
614        reasons.push("CI validation failed".to_string());
615    }
616    if !pushed {
617        reasons.push("HEAD is not yet integrated into target branch".to_string());
618    }
619    if let Some(extra) = extra {
620        reasons.push(extra.to_string());
621    }
622
623    if let Some(validation_error) = &compliance.validation_error {
624        reasons.push(validation_error.clone());
625    }
626
627    if reasons.is_empty() {
628        "integration did not satisfy completion contract".to_string()
629    } else {
630        reasons.join("; ")
631    }
632}
633
634// =============================================================================
635// Integration Loop
636// =============================================================================
637
638/// Run the integration loop for a completed worker.
639///
640/// Integration actions are agent-owned via continue-session prompts.
641/// Ralph only validates completion and retries when contract checks fail.
642#[allow(clippy::too_many_arguments)]
643pub(crate) fn run_integration_loop(
644    resolved: &Resolved,
645    task_id: &str,
646    task_title: &str,
647    config: &IntegrationConfig,
648    phase_summary: &str,
649    continue_session: &mut ContinueSession,
650    on_resume: &mut dyn FnMut(&crate::runner::RunnerOutput, Duration) -> Result<()>,
651    plugins: Option<&crate::plugins::registry::PluginRegistry>,
652) -> Result<IntegrationOutcome> {
653    let repo_root = &resolved.repo_root;
654    clear_blocked_push_marker(repo_root);
655    let mut previous_failure: Option<String> = None;
656
657    for attempt_index in 0..config.max_attempts {
658        let attempt = attempt_index + 1;
659        log::info!(
660            "Agent-owned integration attempt {}/{} for {}",
661            attempt,
662            config.max_attempts,
663            task_id
664        );
665
666        let status_snapshot = git::status_porcelain(repo_root).unwrap_or_default();
667        let prompt = build_agent_integration_prompt(
668            task_id,
669            task_title,
670            &config.target_branch,
671            &resolved.queue_path,
672            &resolved.done_path,
673            attempt,
674            config.max_attempts,
675            phase_summary,
676            &status_snapshot,
677            config.ci_enabled,
678            &config.ci_label,
679            previous_failure.as_deref(),
680        );
681
682        let (output, elapsed) =
683            match resume_continue_session(resolved, continue_session, &prompt, plugins) {
684                Ok(resume) => resume,
685                Err(err) => {
686                    let reason = format!("integration continuation failed: {:#}", err);
687                    if attempt >= config.max_attempts {
688                        if let Err(marker_err) = write_blocked_push_marker(
689                            repo_root,
690                            task_id,
691                            &reason,
692                            attempt,
693                            config.max_attempts,
694                        ) {
695                            log::warn!("Failed to write blocked marker: {}", marker_err);
696                        }
697                        return Ok(IntegrationOutcome::BlockedPush { reason });
698                    }
699                    previous_failure = Some(reason);
700                    wait_before_retry(config, attempt_index as usize, task_id)?;
701                    continue;
702                }
703            };
704
705        on_resume(&output, elapsed)?;
706
707        let compliance = run_compliance_checks(repo_root, resolved, task_id, config.ci_enabled)?;
708        let (pushed, push_check_error) =
709            match head_is_synced_to_remote(repo_root, &config.target_branch) {
710                Ok(value) => (value, None),
711                Err(err) => (false, Some(format!("push sync validation failed: {}", err))),
712            };
713
714        if compliance.all_passed() && pushed {
715            log::info!(
716                "Integration succeeded for {} on attempt {}/{}",
717                task_id,
718                attempt,
719                config.max_attempts
720            );
721            return Ok(IntegrationOutcome::Success);
722        }
723
724        let reason = compose_block_reason(&compliance, pushed, push_check_error.as_deref());
725        let mut handoff = RemediationHandoff::new(
726            task_id,
727            task_title,
728            &config.target_branch,
729            attempt,
730            config.max_attempts,
731        )
732        .with_conflicts(compliance.conflict_files.clone())
733        .with_git_status(git::status_porcelain(repo_root).unwrap_or_default())
734        .with_phase_summary(phase_summary.to_string())
735        .with_task_intent(format!("Complete task {}: {}", task_id, task_title));
736
737        if !compliance.ci_passed {
738            handoff = handoff.with_ci_context(
739                config.ci_label.clone(),
740                compliance
741                    .validation_error
742                    .clone()
743                    .unwrap_or_else(|| "CI gate validation failed".to_string()),
744                1,
745            );
746        }
747
748        if let Err(err) = write_handoff_packet(repo_root, task_id, attempt, &handoff) {
749            log::warn!("Failed to persist remediation handoff packet: {}", err);
750        }
751
752        if attempt >= config.max_attempts {
753            if let Err(marker_err) =
754                write_blocked_push_marker(repo_root, task_id, &reason, attempt, config.max_attempts)
755            {
756                log::warn!("Failed to write blocked marker: {}", marker_err);
757            }
758            return Ok(IntegrationOutcome::BlockedPush { reason });
759        }
760
761        previous_failure = Some(reason);
762        wait_before_retry(config, attempt_index as usize, task_id)?;
763    }
764
765    let reason = format!("integration exhausted {} attempts", config.max_attempts);
766    if let Err(marker_err) = write_blocked_push_marker(
767        repo_root,
768        task_id,
769        &reason,
770        config.max_attempts,
771        config.max_attempts,
772    ) {
773        log::warn!("Failed to write blocked marker: {}", marker_err);
774    }
775    Ok(IntegrationOutcome::BlockedPush { reason })
776}
777
778fn wait_before_retry(
779    config: &IntegrationConfig,
780    attempt_index: usize,
781    task_id: &str,
782) -> Result<()> {
783    let delay = config.backoff_for_attempt(attempt_index);
784    log::info!(
785        "Integration retry backoff for {}: sleeping {}ms before next attempt",
786        task_id,
787        delay.as_millis()
788    );
789    sleep_with_cancellation(delay, None)
790        .map_err(|_| anyhow::anyhow!("integration retry cancelled for {}", task_id))
791}
792
793#[cfg(test)]
794mod tests {
795    use super::*;
796    use crate::contracts::{QueueFile, Task, TaskPriority, TaskStatus};
797    use std::collections::HashMap;
798    use tempfile::TempDir;
799
800    fn make_task(id: &str, status: TaskStatus) -> Task {
801        Task {
802            id: id.to_string(),
803            title: format!("Task {}", id),
804            description: None,
805            status,
806            priority: TaskPriority::Medium,
807            tags: vec![],
808            scope: vec![],
809            evidence: vec![],
810            plan: vec![],
811            notes: vec![],
812            request: None,
813            agent: None,
814            created_at: Some("2026-01-01T00:00:00Z".to_string()),
815            updated_at: Some("2026-01-01T00:00:00Z".to_string()),
816            completed_at: None,
817            started_at: None,
818            scheduled_start: None,
819            depends_on: vec![],
820            blocks: vec![],
821            relates_to: vec![],
822            duplicates: None,
823            custom_fields: HashMap::new(),
824            estimated_minutes: None,
825            actual_minutes: None,
826            parent_id: None,
827        }
828    }
829
830    #[test]
831    fn integration_config_default_backoff() {
832        let config = IntegrationConfig {
833            max_attempts: 5,
834            backoff_ms: vec![500, 2000, 5000, 10000],
835            target_branch: "main".into(),
836            ci_enabled: true,
837            ci_label: "make ci".into(),
838        };
839
840        assert_eq!(config.backoff_for_attempt(0), Duration::from_millis(500));
841        assert_eq!(config.backoff_for_attempt(1), Duration::from_millis(2000));
842        assert_eq!(config.backoff_for_attempt(2), Duration::from_millis(5000));
843        assert_eq!(config.backoff_for_attempt(3), Duration::from_millis(10000));
844        assert_eq!(config.backoff_for_attempt(4), Duration::from_millis(10000));
845        assert_eq!(config.backoff_for_attempt(10), Duration::from_millis(10000));
846    }
847
848    #[test]
849    fn remediation_handoff_builder() {
850        let handoff = RemediationHandoff::new("RQ-0001", "Test Task", "main", 2, 5)
851            .with_conflicts(vec!["src/lib.rs".into(), "src/main.rs".into()])
852            .with_git_status("UU src/lib.rs\nUU src/main.rs".into())
853            .with_phase_summary("Implemented feature X".into())
854            .with_task_intent("Complete feature X implementation".into());
855
856        assert_eq!(handoff.task_id, "RQ-0001");
857        assert_eq!(handoff.task_title, "Test Task");
858        assert_eq!(handoff.target_branch, "main");
859        assert_eq!(handoff.attempt, 2);
860        assert_eq!(handoff.max_attempts, 5);
861        assert_eq!(handoff.conflict_files.len(), 2);
862        assert_eq!(handoff.phase_summary, "Implemented feature X");
863        assert!(handoff.ci_context.is_none());
864    }
865
866    #[test]
867    fn remediation_handoff_with_ci() {
868        let handoff = RemediationHandoff::new("RQ-0001", "Test", "main", 1, 5).with_ci_context(
869            "make ci".into(),
870            "test failed".into(),
871            1,
872        );
873
874        assert!(handoff.ci_context.is_some());
875        let ci = handoff.ci_context.unwrap();
876        assert_eq!(ci.command, "make ci");
877        assert_eq!(ci.last_output, "test failed");
878        assert_eq!(ci.exit_code, 1);
879    }
880
881    #[test]
882    fn integration_prompt_contains_mandatory_contract() {
883        let prompt = build_agent_integration_prompt(
884            "RQ-0001",
885            "Implement feature",
886            "main",
887            Path::new("/tmp/queue.json"),
888            Path::new("/tmp/done.json"),
889            1,
890            5,
891            "phase summary",
892            " M src/lib.rs",
893            true,
894            "make ci",
895            Some("previous failure"),
896        );
897
898        assert!(prompt.contains("MUST execute integration git operations"));
899        assert!(prompt.contains("Completion Contract (Mandatory)"));
900        assert!(prompt.contains("git push origin HEAD:main"));
901        assert!(prompt.contains("previous failure"));
902    }
903
904    #[test]
905    fn integration_prompt_uses_explicit_target_branch_for_push() {
906        let prompt = build_agent_integration_prompt(
907            "RQ-0001",
908            "Implement feature",
909            "release/2026",
910            Path::new("/tmp/queue.json"),
911            Path::new("/tmp/done.json"),
912            1,
913            5,
914            "phase summary",
915            " M src/lib.rs",
916            true,
917            "make ci",
918            None,
919        );
920
921        assert!(prompt.contains("git fetch origin release/2026"));
922        assert!(prompt.contains("git rebase origin/release/2026"));
923        assert!(prompt.contains("git push origin HEAD:release/2026"));
924    }
925
926    #[test]
927    fn integration_prompt_sanitizes_nul_bytes() {
928        let prompt = build_agent_integration_prompt(
929            "RQ-0001",
930            "NUL test",
931            "main",
932            Path::new("/tmp/queue.json"),
933            Path::new("/tmp/done.json"),
934            1,
935            5,
936            "phase\0summary",
937            "status\0snapshot",
938            true,
939            "make ci",
940            Some("previous\0failure"),
941        );
942
943        assert!(!prompt.contains('\0'));
944        assert!(prompt.contains("phase summary"));
945        assert!(prompt.contains("status snapshot"));
946        assert!(prompt.contains("previous failure"));
947    }
948
949    #[test]
950    fn compliance_result_all_passed() {
951        let passed = ComplianceResult {
952            has_unresolved_conflicts: false,
953            queue_done_valid: true,
954            task_archived: true,
955            ci_passed: true,
956            conflict_files: vec![],
957            validation_error: None,
958        };
959        assert!(passed.all_passed());
960
961        let failed = ComplianceResult {
962            has_unresolved_conflicts: false,
963            queue_done_valid: true,
964            task_archived: false,
965            ci_passed: true,
966            conflict_files: vec![],
967            validation_error: None,
968        };
969        assert!(!failed.all_passed());
970    }
971
972    #[test]
973    fn integration_config_uses_explicit_target_branch() -> Result<()> {
974        let dir = tempfile::TempDir::new()?;
975        let resolved = crate::config::Resolved {
976            config: crate::contracts::Config::default(),
977            repo_root: dir.path().to_path_buf(),
978            queue_path: dir.path().join(".ralph/queue.json"),
979            done_path: dir.path().join(".ralph/done.json"),
980            id_prefix: "RQ".to_string(),
981            id_width: 4,
982            global_config_path: None,
983            project_config_path: None,
984        };
985
986        let cfg = IntegrationConfig::from_resolved(&resolved, "release/2026");
987        assert_eq!(cfg.target_branch, "release/2026");
988        Ok(())
989    }
990
991    #[test]
992    fn task_archived_validation_uses_resolved_paths_not_workspace_local_files() -> Result<()> {
993        let dir = TempDir::new()?;
994        let coordinator = dir.path().join("coordinator");
995        let worker_workspace = dir.path().join("worker-ws");
996        std::fs::create_dir_all(&coordinator)?;
997        std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
998
999        let coordinator_queue = coordinator.join("queue.json");
1000        let coordinator_done = coordinator.join("done.json");
1001        let workspace_queue = worker_workspace.join(".ralph/queue.json");
1002        let workspace_done = worker_workspace.join(".ralph/done.json");
1003
1004        let mut coordinator_queue_file = QueueFile::default();
1005        coordinator_queue_file
1006            .tasks
1007            .push(make_task("RQ-0001", TaskStatus::Todo));
1008        queue::save_queue(&coordinator_queue, &coordinator_queue_file)?;
1009        queue::save_queue(&coordinator_done, &QueueFile::default())?;
1010
1011        // Workspace-local files look archived, but should be ignored by validation.
1012        queue::save_queue(&workspace_queue, &QueueFile::default())?;
1013        let mut workspace_done_file = QueueFile::default();
1014        workspace_done_file
1015            .tasks
1016            .push(make_task("RQ-0001", TaskStatus::Done));
1017        queue::save_queue(&workspace_done, &workspace_done_file)?;
1018
1019        let resolved = crate::config::Resolved {
1020            config: crate::contracts::Config::default(),
1021            repo_root: worker_workspace,
1022            queue_path: coordinator_queue.clone(),
1023            done_path: coordinator_done,
1024            id_prefix: "RQ".to_string(),
1025            id_width: 4,
1026            global_config_path: None,
1027            project_config_path: None,
1028        };
1029
1030        let err = validate_task_archived(&resolved, "RQ-0001")
1031            .expect_err("validation should use resolved queue path");
1032        let msg = err.to_string();
1033        assert!(
1034            msg.contains(coordinator_queue.to_string_lossy().as_ref()),
1035            "error should reference resolved queue path, got: {msg}"
1036        );
1037        Ok(())
1038    }
1039
1040    #[test]
1041    fn queue_done_semantics_validation_uses_resolved_paths() -> Result<()> {
1042        let dir = TempDir::new()?;
1043        let coordinator = dir.path().join("coordinator");
1044        let worker_workspace = dir.path().join("worker-ws");
1045        std::fs::create_dir_all(&coordinator)?;
1046        std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
1047
1048        let coordinator_queue = coordinator.join("queue.json");
1049        let coordinator_done = coordinator.join("done.json");
1050        let workspace_queue = worker_workspace.join(".ralph/queue.json");
1051        let workspace_done = worker_workspace.join(".ralph/done.json");
1052
1053        // Coordinator queue is semantically invalid for RQ id rules.
1054        let mut invalid_queue = QueueFile::default();
1055        invalid_queue
1056            .tasks
1057            .push(make_task("BAD-ID", TaskStatus::Todo));
1058        queue::save_queue(&coordinator_queue, &invalid_queue)?;
1059        queue::save_queue(&coordinator_done, &QueueFile::default())?;
1060
1061        // Workspace-local queue is valid, but should not be read.
1062        let mut valid_queue = QueueFile::default();
1063        valid_queue
1064            .tasks
1065            .push(make_task("RQ-0001", TaskStatus::Todo));
1066        queue::save_queue(&workspace_queue, &valid_queue)?;
1067        queue::save_queue(&workspace_done, &QueueFile::default())?;
1068
1069        let resolved = crate::config::Resolved {
1070            config: crate::contracts::Config::default(),
1071            repo_root: worker_workspace.clone(),
1072            queue_path: coordinator_queue,
1073            done_path: coordinator_done,
1074            id_prefix: "RQ".to_string(),
1075            id_width: 4,
1076            global_config_path: None,
1077            project_config_path: None,
1078        };
1079
1080        validate_queue_done_semantics(&worker_workspace, &resolved)
1081            .expect_err("validation should fail from resolved queue path");
1082        Ok(())
1083    }
1084
1085    #[test]
1086    fn blocked_marker_roundtrip() -> Result<()> {
1087        let temp = TempDir::new()?;
1088        write_blocked_push_marker(temp.path(), "RQ-0001", "blocked reason", 5, 5)?;
1089        let marker = read_blocked_push_marker(temp.path())?.expect("marker should exist");
1090        assert_eq!(marker.task_id, "RQ-0001");
1091        assert_eq!(marker.reason, "blocked reason");
1092        assert_eq!(marker.attempt, 5);
1093        assert_eq!(marker.max_attempts, 5);
1094
1095        clear_blocked_push_marker(temp.path());
1096        assert!(read_blocked_push_marker(temp.path())?.is_none());
1097        Ok(())
1098    }
1099}