1#![allow(dead_code)]
19
20use crate::commands::run::supervision::{ContinueSession, resume_continue_session};
21use crate::config::Resolved;
22use crate::contracts::TaskStatus;
23use crate::git;
24use crate::queue;
25use crate::timeutil;
26use anyhow::{Context, Result, bail};
27use serde::{Deserialize, Serialize};
28use std::path::{Path, PathBuf};
29use std::process::Command;
30use std::time::Duration;
31
32#[derive(Debug, Clone)]
38pub struct IntegrationConfig {
39 pub max_attempts: u32,
41 pub backoff_ms: Vec<u64>,
43 pub target_branch: String,
45 pub ci_command: Option<String>,
47 pub ci_enabled: bool,
49}
50
51impl IntegrationConfig {
52 pub fn from_resolved(resolved: &Resolved, target_branch: &str) -> Self {
53 let parallel = &resolved.config.parallel;
54 let target_branch = target_branch.trim();
55 Self {
56 max_attempts: parallel.max_push_attempts.unwrap_or(50) as u32,
57 backoff_ms: parallel
58 .push_backoff_ms
59 .clone()
60 .unwrap_or_else(super::default_push_backoff_ms),
61 target_branch: if target_branch.is_empty() {
62 "main".to_string()
63 } else {
64 target_branch.to_string()
65 },
66 ci_command: resolved.config.agent.ci_gate_command.clone(),
67 ci_enabled: resolved.config.agent.ci_gate_enabled.unwrap_or(true),
68 }
69 }
70
71 pub fn backoff_for_attempt(&self, attempt: usize) -> Duration {
73 let ms = self
74 .backoff_ms
75 .get(attempt)
76 .copied()
77 .unwrap_or_else(|| self.backoff_ms.last().copied().unwrap_or(10_000));
78 Duration::from_millis(ms)
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
88pub enum IntegrationOutcome {
89 Success,
91 BlockedPush { reason: String },
93 Failed { reason: String },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub(crate) struct BlockedPushMarker {
100 pub task_id: String,
101 pub reason: String,
102 pub attempt: u32,
103 pub max_attempts: u32,
104 pub generated_at: String,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct RemediationHandoff {
114 pub task_id: String,
116 pub task_title: String,
118 pub target_branch: String,
120 pub attempt: u32,
122 pub max_attempts: u32,
124 pub conflict_files: Vec<String>,
126 pub git_status: String,
128 pub phase_summary: String,
130 pub task_intent: String,
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub ci_context: Option<CiContext>,
135 pub generated_at: String,
137 pub queue_done_rules: QueueDoneRules,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct CiContext {
144 pub command: String,
145 pub last_output: String,
146 pub exit_code: i32,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct QueueDoneRules {
152 pub rules: Vec<String>,
153}
154
155impl Default for QueueDoneRules {
156 fn default() -> Self {
157 Self {
158 rules: vec![
159 "Remove the completed task from the queue file".into(),
160 "Ensure the completed task is present in the done archive file".into(),
161 "Preserve entries from other workers exactly".into(),
162 "Preserve task metadata (timestamps/notes)".into(),
163 ],
164 }
165 }
166}
167
168impl RemediationHandoff {
169 pub fn new(
170 task_id: impl Into<String>,
171 task_title: impl Into<String>,
172 target_branch: impl Into<String>,
173 attempt: u32,
174 max_attempts: u32,
175 ) -> Self {
176 Self {
177 task_id: task_id.into(),
178 task_title: task_title.into(),
179 target_branch: target_branch.into(),
180 attempt,
181 max_attempts,
182 conflict_files: Vec::new(),
183 git_status: String::new(),
184 phase_summary: String::new(),
185 task_intent: String::new(),
186 ci_context: None,
187 generated_at: timeutil::now_utc_rfc3339_or_fallback(),
188 queue_done_rules: QueueDoneRules::default(),
189 }
190 }
191
192 pub fn with_conflicts(mut self, files: Vec<String>) -> Self {
193 self.conflict_files = files;
194 self
195 }
196
197 pub fn with_git_status(mut self, status: String) -> Self {
198 self.git_status = status;
199 self
200 }
201
202 pub fn with_phase_summary(mut self, summary: String) -> Self {
203 self.phase_summary = summary;
204 self
205 }
206
207 pub fn with_task_intent(mut self, intent: String) -> Self {
208 self.task_intent = intent;
209 self
210 }
211
212 pub fn with_ci_context(mut self, command: String, last_output: String, exit_code: i32) -> Self {
213 self.ci_context = Some(CiContext {
214 command,
215 last_output,
216 exit_code,
217 });
218 self
219 }
220}
221
222fn blocked_push_marker_path(workspace_path: &Path) -> PathBuf {
223 workspace_path.join(super::BLOCKED_PUSH_MARKER_FILE)
224}
225
226fn write_blocked_push_marker(
227 workspace_path: &Path,
228 task_id: &str,
229 reason: &str,
230 attempt: u32,
231 max_attempts: u32,
232) -> Result<()> {
233 let marker = BlockedPushMarker {
234 task_id: task_id.trim().to_string(),
235 reason: reason.to_string(),
236 attempt,
237 max_attempts,
238 generated_at: timeutil::now_utc_rfc3339_or_fallback(),
239 };
240 let marker_path = blocked_push_marker_path(workspace_path);
241 if let Some(parent) = marker_path.parent() {
242 std::fs::create_dir_all(parent)
243 .with_context(|| format!("create blocked marker directory {}", parent.display()))?;
244 }
245 let rendered = serde_json::to_string_pretty(&marker).context("serialize blocked marker")?;
246 crate::fsutil::write_atomic(&marker_path, rendered.as_bytes())
247 .with_context(|| format!("write blocked marker {}", marker_path.display()))?;
248 Ok(())
249}
250
251fn clear_blocked_push_marker(workspace_path: &Path) {
252 let marker_path = blocked_push_marker_path(workspace_path);
253 if !marker_path.exists() {
254 return;
255 }
256 if let Err(err) = std::fs::remove_file(&marker_path) {
257 log::warn!(
258 "Failed to clear blocked marker at {}: {}",
259 marker_path.display(),
260 err
261 );
262 }
263}
264
265pub(crate) fn read_blocked_push_marker(workspace_path: &Path) -> Result<Option<BlockedPushMarker>> {
266 let marker_path = blocked_push_marker_path(workspace_path);
267 if !marker_path.exists() {
268 return Ok(None);
269 }
270 let raw = std::fs::read_to_string(&marker_path)
271 .with_context(|| format!("read blocked marker {}", marker_path.display()))?;
272 let marker =
273 serde_json::from_str::<BlockedPushMarker>(&raw).context("parse blocked marker json")?;
274 Ok(Some(marker))
275}
276
277pub fn write_handoff_packet(
279 workspace_path: &Path,
280 task_id: &str,
281 attempt: u32,
282 handoff: &RemediationHandoff,
283) -> Result<PathBuf> {
284 let handoff_dir = workspace_path
285 .join(".ralph/cache/parallel/handoffs")
286 .join(task_id);
287 std::fs::create_dir_all(&handoff_dir)
288 .with_context(|| format!("create handoff directory {}", handoff_dir.display()))?;
289
290 let path = handoff_dir.join(format!("attempt_{}.json", attempt));
291 let content = serde_json::to_string_pretty(handoff).context("serialize handoff packet")?;
292 crate::fsutil::write_atomic(&path, content.as_bytes())
293 .with_context(|| format!("write handoff packet to {}", path.display()))?;
294 Ok(path)
295}
296
297#[derive(Debug, Clone)]
303pub struct ComplianceResult {
304 pub has_unresolved_conflicts: bool,
305 pub queue_done_valid: bool,
306 pub task_archived: bool,
307 pub ci_passed: bool,
308 pub conflict_files: Vec<String>,
309 pub validation_error: Option<String>,
310}
311
312impl ComplianceResult {
313 pub fn all_passed(&self) -> bool {
314 !self.has_unresolved_conflicts
315 && self.queue_done_valid
316 && self.task_archived
317 && self.ci_passed
318 }
319}
320
321pub fn run_compliance_checks(
323 repo_root: &Path,
324 resolved: &Resolved,
325 task_id: &str,
326 ci_enabled: bool,
327) -> Result<ComplianceResult> {
328 let conflict_files = git::list_conflict_files(repo_root)?;
329 let has_unresolved_conflicts = !conflict_files.is_empty();
330
331 let mut errors = Vec::new();
332 if has_unresolved_conflicts {
333 errors.push("unresolved merge conflicts remain".to_string());
334 }
335
336 let queue_done_valid = match validate_queue_done_semantics(repo_root, resolved) {
337 Ok(()) => true,
338 Err(err) => {
339 errors.push(format!("queue/done semantic validation failed: {}", err));
340 false
341 }
342 };
343
344 let task_archived = match validate_task_archived(resolved, task_id) {
345 Ok(()) => true,
346 Err(err) => {
347 errors.push(format!("task archival validation failed: {}", err));
348 false
349 }
350 };
351
352 let ci_passed = if ci_enabled {
353 match run_ci_check(repo_root, resolved) {
354 Ok(()) => true,
355 Err(err) => {
356 errors.push(format!("CI gate failed: {}", err));
357 false
358 }
359 }
360 } else {
361 true
362 };
363
364 Ok(ComplianceResult {
365 has_unresolved_conflicts,
366 queue_done_valid,
367 task_archived,
368 ci_passed,
369 conflict_files,
370 validation_error: if errors.is_empty() {
371 None
372 } else {
373 Some(errors.join("; "))
374 },
375 })
376}
377
378fn validate_queue_done_semantics(_repo_root: &Path, resolved: &Resolved) -> Result<()> {
380 let queue_path = resolved.queue_path.clone();
381 let done_path = resolved.done_path.clone();
382
383 let queue = queue::load_queue(&queue_path).context("load queue for validation")?;
384 let max_depth = resolved.config.queue.max_dependency_depth.unwrap_or(10);
385 let done = if done_path.exists() {
386 Some(queue::load_queue(&done_path).context("load done for validation")?)
387 } else {
388 None
389 };
390
391 queue::validate_queue_set(
392 &queue,
393 done.as_ref(),
394 &resolved.id_prefix,
395 resolved.id_width,
396 max_depth,
397 )
398 .context("queue/done semantic validation")?;
399
400 Ok(())
401}
402
403fn validate_task_archived(resolved: &Resolved, task_id: &str) -> Result<()> {
405 let task_id = task_id.trim();
406 if task_id.is_empty() {
407 bail!("task id is empty");
408 }
409
410 let queue_path = resolved.queue_path.clone();
411 let done_path = resolved.done_path.clone();
412
413 if !queue_path.exists() {
414 bail!("queue file missing at {}", queue_path.display());
415 }
416 if !done_path.exists() {
417 bail!("done file missing at {}", done_path.display());
418 }
419
420 let queue_file = queue::load_queue(&queue_path)
421 .with_context(|| format!("load queue file {}", queue_path.display()))?;
422 if queue_file
423 .tasks
424 .iter()
425 .any(|task| task.id.trim() == task_id)
426 {
427 bail!("task {} still present in {}", task_id, queue_path.display());
428 }
429
430 let done_file = queue::load_queue(&done_path)
431 .with_context(|| format!("load done file {}", done_path.display()))?;
432 let done_task = done_file
433 .tasks
434 .iter()
435 .find(|task| task.id.trim() == task_id)
436 .ok_or_else(|| anyhow::anyhow!("task {} missing from {}", task_id, done_path.display()))?;
437
438 if done_task.status != TaskStatus::Done {
439 bail!(
440 "task {} exists in done but status is {:?}, expected done",
441 task_id,
442 done_task.status
443 );
444 }
445
446 Ok(())
447}
448
449fn run_ci_check(repo_root: &Path, resolved: &Resolved) -> Result<()> {
451 let ci_command = resolved
452 .config
453 .agent
454 .ci_gate_command
455 .as_deref()
456 .unwrap_or("make ci");
457
458 log::info!(
459 "Running CI gate validation (may take several minutes): {}",
460 ci_command
461 );
462 let started = std::time::Instant::now();
463
464 let output = Command::new("sh")
465 .arg("-c")
466 .arg(ci_command)
467 .current_dir(repo_root)
468 .output()
469 .context("spawn CI gate command")?;
470
471 log::info!(
472 "CI gate validation finished in {:.1}s with exit code {:?}",
473 started.elapsed().as_secs_f64(),
474 output.status.code()
475 );
476
477 if !output.status.success() {
478 let stderr = String::from_utf8_lossy(&output.stderr);
479 let stdout = String::from_utf8_lossy(&output.stdout);
480 let combined = format!("{}\n{}", stdout, stderr).to_lowercase();
481 if combined.contains("waiting for file lock")
482 || combined.contains("file lock on build directory")
483 {
484 bail!(
485 "CI lock contention detected (stale build/test process likely holding a lock). {} | {}",
486 stdout.trim(),
487 stderr.trim()
488 );
489 }
490 bail!("{} | {}", stdout.trim(), stderr.trim());
491 }
492
493 Ok(())
494}
495
496fn head_is_synced_to_remote(repo_root: &Path, target_branch: &str) -> Result<bool> {
498 git::fetch_branch(repo_root, "origin", target_branch)
499 .with_context(|| format!("fetch origin/{} for sync check", target_branch))?;
500
501 let remote_ref = format!("origin/{}", target_branch);
502 let output = Command::new("git")
503 .args(["merge-base", "--is-ancestor", "HEAD", &remote_ref])
504 .current_dir(repo_root)
505 .output()
506 .with_context(|| format!("check if HEAD is ancestor of {}", remote_ref))?;
507
508 if output.status.success() {
509 return Ok(true);
510 }
511 if output.status.code() == Some(1) {
512 return Ok(false);
513 }
514
515 let stderr = String::from_utf8_lossy(&output.stderr);
516 bail!(
517 "unable to verify push sync against {}: {}",
518 remote_ref,
519 stderr.trim()
520 );
521}
522
523#[allow(clippy::too_many_arguments)]
528fn build_agent_integration_prompt(
529 task_id: &str,
530 task_title: &str,
531 target_branch: &str,
532 queue_path: &Path,
533 done_path: &Path,
534 attempt: u32,
535 max_attempts: u32,
536 phase_summary: &str,
537 status_snapshot: &str,
538 ci_enabled: bool,
539 ci_command: Option<&str>,
540 previous_failure: Option<&str>,
541) -> String {
542 let queue_path_display = queue_path.display();
543 let done_path_display = done_path.display();
544 let failure_block = previous_failure.map_or_else(String::new, |failure| {
545 format!("\n## Previous Attempt Failed\n{}\n", failure)
546 });
547
548 let ci_block = if ci_enabled {
549 format!(
550 "- Run CI gate and fix failures before pushing: `{}`",
551 ci_command.unwrap_or("make ci")
552 )
553 } else {
554 "- CI gate is disabled for this task".to_string()
555 };
556
557 sanitize_prompt_for_runner(&format!(
558 r#"# Parallel Integration (Mandatory) - Attempt {attempt}/{max_attempts}
559You are finalizing task `{task_id}` (`{task_title}`) for direct push to `origin/{target_branch}`.
560
561## Hard Requirement
562You MUST execute integration git operations yourself in this turn. Do not stop early.
563You are NOT done until all required checks are satisfied.
564
565## Context
566- Phase summary: {phase_summary}
567- Current git status snapshot:
568```text
569{status_snapshot}
570```
571{failure_block}
572## Required Sequence
5731. `git fetch origin {target_branch}`
5742. Rebase on latest remote state: `git rebase origin/{target_branch}`
5753. If conflicts exist:
576 - Resolve every conflict marker while preserving both upstream and task intent.
577 - For queue/done files, preserve other workers' entries exactly.
578 - Ensure `{task_id}` is removed from queue and present as done in done.
579 - Continue rebase until complete (`git add ...`, `git rebase --continue`).
5804. Ensure bookkeeping is correct:
581 - `{queue_path_display}` does NOT contain `{task_id}`
582 - `{done_path_display}` DOES contain `{task_id}` with done status
5835. Stage and commit any remaining changes needed for integration.
5846. {ci_block}
5857. Push directly to base branch: `git push origin HEAD:{target_branch}`
5868. If push is rejected (non-fast-forward), repeat from step 1 in this same turn.
587
588## Completion Contract (Mandatory)
589Before ending your response:
590- No unresolved merge conflicts remain.
591- Push to `origin/{target_branch}` has succeeded.
592- Bookkeeping files are semantically correct for `{task_id}`.
593- CI has passed when enabled.
594
595If any check fails, keep working in this same turn until fixed.
596"#
597 ))
598}
599
600fn sanitize_prompt_for_runner(prompt: &str) -> String {
601 prompt
602 .chars()
603 .map(|c| {
604 if c.is_control() && c != '\n' && c != '\r' && c != '\t' {
605 ' '
606 } else {
607 c
608 }
609 })
610 .collect()
611}
612
613fn compose_block_reason(
614 compliance: &ComplianceResult,
615 pushed: bool,
616 extra: Option<&str>,
617) -> String {
618 let mut reasons = Vec::new();
619
620 if compliance.has_unresolved_conflicts {
621 reasons.push(format!(
622 "unresolved conflicts: {}",
623 compliance.conflict_files.join(", ")
624 ));
625 }
626 if !compliance.queue_done_valid {
627 reasons.push("queue/done semantic validation failed".to_string());
628 }
629 if !compliance.task_archived {
630 reasons.push("task archival validation failed".to_string());
631 }
632 if !compliance.ci_passed {
633 reasons.push("CI validation failed".to_string());
634 }
635 if !pushed {
636 reasons.push("HEAD is not yet integrated into target branch".to_string());
637 }
638 if let Some(extra) = extra {
639 reasons.push(extra.to_string());
640 }
641
642 if let Some(validation_error) = &compliance.validation_error {
643 reasons.push(validation_error.clone());
644 }
645
646 if reasons.is_empty() {
647 "integration did not satisfy completion contract".to_string()
648 } else {
649 reasons.join("; ")
650 }
651}
652
653#[allow(clippy::too_many_arguments)]
662pub(crate) fn run_integration_loop(
663 resolved: &Resolved,
664 task_id: &str,
665 task_title: &str,
666 config: &IntegrationConfig,
667 phase_summary: &str,
668 continue_session: &mut ContinueSession,
669 on_resume: &mut dyn FnMut(&crate::runner::RunnerOutput, Duration) -> Result<()>,
670 plugins: Option<&crate::plugins::registry::PluginRegistry>,
671) -> Result<IntegrationOutcome> {
672 let repo_root = &resolved.repo_root;
673 clear_blocked_push_marker(repo_root);
674 let mut previous_failure: Option<String> = None;
675
676 for attempt_index in 0..config.max_attempts {
677 let attempt = attempt_index + 1;
678 log::info!(
679 "Agent-owned integration attempt {}/{} for {}",
680 attempt,
681 config.max_attempts,
682 task_id
683 );
684
685 let status_snapshot = git::status_porcelain(repo_root).unwrap_or_default();
686 let prompt = build_agent_integration_prompt(
687 task_id,
688 task_title,
689 &config.target_branch,
690 &resolved.queue_path,
691 &resolved.done_path,
692 attempt,
693 config.max_attempts,
694 phase_summary,
695 &status_snapshot,
696 config.ci_enabled,
697 config.ci_command.as_deref(),
698 previous_failure.as_deref(),
699 );
700
701 let (output, elapsed) =
702 match resume_continue_session(resolved, continue_session, &prompt, plugins) {
703 Ok(resume) => resume,
704 Err(err) => {
705 let reason = format!("integration continuation failed: {:#}", err);
706 if attempt >= config.max_attempts {
707 if let Err(marker_err) = write_blocked_push_marker(
708 repo_root,
709 task_id,
710 &reason,
711 attempt,
712 config.max_attempts,
713 ) {
714 log::warn!("Failed to write blocked marker: {}", marker_err);
715 }
716 return Ok(IntegrationOutcome::BlockedPush { reason });
717 }
718 previous_failure = Some(reason);
719 std::thread::sleep(config.backoff_for_attempt(attempt_index as usize));
720 continue;
721 }
722 };
723
724 on_resume(&output, elapsed)?;
725
726 let compliance = run_compliance_checks(repo_root, resolved, task_id, config.ci_enabled)?;
727 let (pushed, push_check_error) =
728 match head_is_synced_to_remote(repo_root, &config.target_branch) {
729 Ok(value) => (value, None),
730 Err(err) => (false, Some(format!("push sync validation failed: {}", err))),
731 };
732
733 if compliance.all_passed() && pushed {
734 log::info!(
735 "Integration succeeded for {} on attempt {}/{}",
736 task_id,
737 attempt,
738 config.max_attempts
739 );
740 return Ok(IntegrationOutcome::Success);
741 }
742
743 let reason = compose_block_reason(&compliance, pushed, push_check_error.as_deref());
744 let mut handoff = RemediationHandoff::new(
745 task_id,
746 task_title,
747 &config.target_branch,
748 attempt,
749 config.max_attempts,
750 )
751 .with_conflicts(compliance.conflict_files.clone())
752 .with_git_status(git::status_porcelain(repo_root).unwrap_or_default())
753 .with_phase_summary(phase_summary.to_string())
754 .with_task_intent(format!("Complete task {}: {}", task_id, task_title));
755
756 if !compliance.ci_passed {
757 handoff = handoff.with_ci_context(
758 config
759 .ci_command
760 .clone()
761 .unwrap_or_else(|| "make ci".into()),
762 compliance
763 .validation_error
764 .clone()
765 .unwrap_or_else(|| "CI gate validation failed".to_string()),
766 1,
767 );
768 }
769
770 if let Err(err) = write_handoff_packet(repo_root, task_id, attempt, &handoff) {
771 log::warn!("Failed to persist remediation handoff packet: {}", err);
772 }
773
774 if attempt >= config.max_attempts {
775 if let Err(marker_err) =
776 write_blocked_push_marker(repo_root, task_id, &reason, attempt, config.max_attempts)
777 {
778 log::warn!("Failed to write blocked marker: {}", marker_err);
779 }
780 return Ok(IntegrationOutcome::BlockedPush { reason });
781 }
782
783 previous_failure = Some(reason);
784 std::thread::sleep(config.backoff_for_attempt(attempt_index as usize));
785 }
786
787 let reason = format!("integration exhausted {} attempts", config.max_attempts);
788 if let Err(marker_err) = write_blocked_push_marker(
789 repo_root,
790 task_id,
791 &reason,
792 config.max_attempts,
793 config.max_attempts,
794 ) {
795 log::warn!("Failed to write blocked marker: {}", marker_err);
796 }
797 Ok(IntegrationOutcome::BlockedPush { reason })
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use crate::contracts::{QueueFile, Task, TaskPriority, TaskStatus};
804 use std::collections::HashMap;
805 use tempfile::TempDir;
806
807 fn make_task(id: &str, status: TaskStatus) -> Task {
808 Task {
809 id: id.to_string(),
810 title: format!("Task {}", id),
811 description: None,
812 status,
813 priority: TaskPriority::Medium,
814 tags: vec![],
815 scope: vec![],
816 evidence: vec![],
817 plan: vec![],
818 notes: vec![],
819 request: None,
820 agent: None,
821 created_at: Some("2026-01-01T00:00:00Z".to_string()),
822 updated_at: Some("2026-01-01T00:00:00Z".to_string()),
823 completed_at: None,
824 started_at: None,
825 scheduled_start: None,
826 depends_on: vec![],
827 blocks: vec![],
828 relates_to: vec![],
829 duplicates: None,
830 custom_fields: HashMap::new(),
831 estimated_minutes: None,
832 actual_minutes: None,
833 parent_id: None,
834 }
835 }
836
837 #[test]
838 fn integration_config_default_backoff() {
839 let config = IntegrationConfig {
840 max_attempts: 5,
841 backoff_ms: vec![500, 2000, 5000, 10000],
842 target_branch: "main".into(),
843 ci_command: Some("make ci".into()),
844 ci_enabled: true,
845 };
846
847 assert_eq!(config.backoff_for_attempt(0), Duration::from_millis(500));
848 assert_eq!(config.backoff_for_attempt(1), Duration::from_millis(2000));
849 assert_eq!(config.backoff_for_attempt(2), Duration::from_millis(5000));
850 assert_eq!(config.backoff_for_attempt(3), Duration::from_millis(10000));
851 assert_eq!(config.backoff_for_attempt(4), Duration::from_millis(10000));
852 assert_eq!(config.backoff_for_attempt(10), Duration::from_millis(10000));
853 }
854
855 #[test]
856 fn remediation_handoff_builder() {
857 let handoff = RemediationHandoff::new("RQ-0001", "Test Task", "main", 2, 5)
858 .with_conflicts(vec!["src/lib.rs".into(), "src/main.rs".into()])
859 .with_git_status("UU src/lib.rs\nUU src/main.rs".into())
860 .with_phase_summary("Implemented feature X".into())
861 .with_task_intent("Complete feature X implementation".into());
862
863 assert_eq!(handoff.task_id, "RQ-0001");
864 assert_eq!(handoff.task_title, "Test Task");
865 assert_eq!(handoff.target_branch, "main");
866 assert_eq!(handoff.attempt, 2);
867 assert_eq!(handoff.max_attempts, 5);
868 assert_eq!(handoff.conflict_files.len(), 2);
869 assert_eq!(handoff.phase_summary, "Implemented feature X");
870 assert!(handoff.ci_context.is_none());
871 }
872
873 #[test]
874 fn remediation_handoff_with_ci() {
875 let handoff = RemediationHandoff::new("RQ-0001", "Test", "main", 1, 5).with_ci_context(
876 "make ci".into(),
877 "test failed".into(),
878 1,
879 );
880
881 assert!(handoff.ci_context.is_some());
882 let ci = handoff.ci_context.unwrap();
883 assert_eq!(ci.command, "make ci");
884 assert_eq!(ci.last_output, "test failed");
885 assert_eq!(ci.exit_code, 1);
886 }
887
888 #[test]
889 fn integration_prompt_contains_mandatory_contract() {
890 let prompt = build_agent_integration_prompt(
891 "RQ-0001",
892 "Implement feature",
893 "main",
894 Path::new("/tmp/queue.json"),
895 Path::new("/tmp/done.json"),
896 1,
897 5,
898 "phase summary",
899 " M src/lib.rs",
900 true,
901 Some("make ci"),
902 Some("previous failure"),
903 );
904
905 assert!(prompt.contains("MUST execute integration git operations"));
906 assert!(prompt.contains("Completion Contract (Mandatory)"));
907 assert!(prompt.contains("git push origin HEAD:main"));
908 assert!(prompt.contains("previous failure"));
909 }
910
911 #[test]
912 fn integration_prompt_uses_explicit_target_branch_for_push() {
913 let prompt = build_agent_integration_prompt(
914 "RQ-0001",
915 "Implement feature",
916 "release/2026",
917 Path::new("/tmp/queue.json"),
918 Path::new("/tmp/done.json"),
919 1,
920 5,
921 "phase summary",
922 " M src/lib.rs",
923 true,
924 Some("make ci"),
925 None,
926 );
927
928 assert!(prompt.contains("git fetch origin release/2026"));
929 assert!(prompt.contains("git rebase origin/release/2026"));
930 assert!(prompt.contains("git push origin HEAD:release/2026"));
931 }
932
933 #[test]
934 fn integration_prompt_sanitizes_nul_bytes() {
935 let prompt = build_agent_integration_prompt(
936 "RQ-0001",
937 "NUL test",
938 "main",
939 Path::new("/tmp/queue.json"),
940 Path::new("/tmp/done.json"),
941 1,
942 5,
943 "phase\0summary",
944 "status\0snapshot",
945 true,
946 Some("make ci"),
947 Some("previous\0failure"),
948 );
949
950 assert!(!prompt.contains('\0'));
951 assert!(prompt.contains("phase summary"));
952 assert!(prompt.contains("status snapshot"));
953 assert!(prompt.contains("previous failure"));
954 }
955
956 #[test]
957 fn compliance_result_all_passed() {
958 let passed = ComplianceResult {
959 has_unresolved_conflicts: false,
960 queue_done_valid: true,
961 task_archived: true,
962 ci_passed: true,
963 conflict_files: vec![],
964 validation_error: None,
965 };
966 assert!(passed.all_passed());
967
968 let failed = ComplianceResult {
969 has_unresolved_conflicts: false,
970 queue_done_valid: true,
971 task_archived: false,
972 ci_passed: true,
973 conflict_files: vec![],
974 validation_error: None,
975 };
976 assert!(!failed.all_passed());
977 }
978
979 #[test]
980 fn integration_config_uses_explicit_target_branch() -> Result<()> {
981 let dir = tempfile::TempDir::new()?;
982 let resolved = crate::config::Resolved {
983 config: crate::contracts::Config::default(),
984 repo_root: dir.path().to_path_buf(),
985 queue_path: dir.path().join(".ralph/queue.json"),
986 done_path: dir.path().join(".ralph/done.json"),
987 id_prefix: "RQ".to_string(),
988 id_width: 4,
989 global_config_path: None,
990 project_config_path: None,
991 };
992
993 let cfg = IntegrationConfig::from_resolved(&resolved, "release/2026");
994 assert_eq!(cfg.target_branch, "release/2026");
995 Ok(())
996 }
997
998 #[test]
999 fn task_archived_validation_uses_resolved_paths_not_workspace_local_files() -> Result<()> {
1000 let dir = TempDir::new()?;
1001 let coordinator = dir.path().join("coordinator");
1002 let worker_workspace = dir.path().join("worker-ws");
1003 std::fs::create_dir_all(&coordinator)?;
1004 std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
1005
1006 let coordinator_queue = coordinator.join("queue.json");
1007 let coordinator_done = coordinator.join("done.json");
1008 let workspace_queue = worker_workspace.join(".ralph/queue.json");
1009 let workspace_done = worker_workspace.join(".ralph/done.json");
1010
1011 let mut coordinator_queue_file = QueueFile::default();
1012 coordinator_queue_file
1013 .tasks
1014 .push(make_task("RQ-0001", TaskStatus::Todo));
1015 queue::save_queue(&coordinator_queue, &coordinator_queue_file)?;
1016 queue::save_queue(&coordinator_done, &QueueFile::default())?;
1017
1018 queue::save_queue(&workspace_queue, &QueueFile::default())?;
1020 let mut workspace_done_file = QueueFile::default();
1021 workspace_done_file
1022 .tasks
1023 .push(make_task("RQ-0001", TaskStatus::Done));
1024 queue::save_queue(&workspace_done, &workspace_done_file)?;
1025
1026 let resolved = crate::config::Resolved {
1027 config: crate::contracts::Config::default(),
1028 repo_root: worker_workspace,
1029 queue_path: coordinator_queue.clone(),
1030 done_path: coordinator_done,
1031 id_prefix: "RQ".to_string(),
1032 id_width: 4,
1033 global_config_path: None,
1034 project_config_path: None,
1035 };
1036
1037 let err = validate_task_archived(&resolved, "RQ-0001")
1038 .expect_err("validation should use resolved queue path");
1039 let msg = err.to_string();
1040 assert!(
1041 msg.contains(coordinator_queue.to_string_lossy().as_ref()),
1042 "error should reference resolved queue path, got: {msg}"
1043 );
1044 Ok(())
1045 }
1046
1047 #[test]
1048 fn queue_done_semantics_validation_uses_resolved_paths() -> Result<()> {
1049 let dir = TempDir::new()?;
1050 let coordinator = dir.path().join("coordinator");
1051 let worker_workspace = dir.path().join("worker-ws");
1052 std::fs::create_dir_all(&coordinator)?;
1053 std::fs::create_dir_all(worker_workspace.join(".ralph"))?;
1054
1055 let coordinator_queue = coordinator.join("queue.json");
1056 let coordinator_done = coordinator.join("done.json");
1057 let workspace_queue = worker_workspace.join(".ralph/queue.json");
1058 let workspace_done = worker_workspace.join(".ralph/done.json");
1059
1060 let mut invalid_queue = QueueFile::default();
1062 invalid_queue
1063 .tasks
1064 .push(make_task("BAD-ID", TaskStatus::Todo));
1065 queue::save_queue(&coordinator_queue, &invalid_queue)?;
1066 queue::save_queue(&coordinator_done, &QueueFile::default())?;
1067
1068 let mut valid_queue = QueueFile::default();
1070 valid_queue
1071 .tasks
1072 .push(make_task("RQ-0001", TaskStatus::Todo));
1073 queue::save_queue(&workspace_queue, &valid_queue)?;
1074 queue::save_queue(&workspace_done, &QueueFile::default())?;
1075
1076 let resolved = crate::config::Resolved {
1077 config: crate::contracts::Config::default(),
1078 repo_root: worker_workspace.clone(),
1079 queue_path: coordinator_queue,
1080 done_path: coordinator_done,
1081 id_prefix: "RQ".to_string(),
1082 id_width: 4,
1083 global_config_path: None,
1084 project_config_path: None,
1085 };
1086
1087 validate_queue_done_semantics(&worker_workspace, &resolved)
1088 .expect_err("validation should fail from resolved queue path");
1089 Ok(())
1090 }
1091
1092 #[test]
1093 fn blocked_marker_roundtrip() -> Result<()> {
1094 let temp = TempDir::new()?;
1095 write_blocked_push_marker(temp.path(), "RQ-0001", "blocked reason", 5, 5)?;
1096 let marker = read_blocked_push_marker(temp.path())?.expect("marker should exist");
1097 assert_eq!(marker.task_id, "RQ-0001");
1098 assert_eq!(marker.reason, "blocked reason");
1099 assert_eq!(marker.attempt, 5);
1100 assert_eq!(marker.max_attempts, 5);
1101
1102 clear_blocked_push_marker(temp.path());
1103 assert!(read_blocked_push_marker(temp.path())?.is_none());
1104 Ok(())
1105 }
1106}