async fn run_issue_fix_worker_session(
state: &AppState,
record: &CoderRunRecord,
task_id: Option<&str>,
prompt: String,
worker_kind: &str,
artifact_type: &str,
relative_path: &str,
) -> Result<(ContextBlackboardArtifact, Value), StatusCode> {
let model = resolve_coder_worker_model_spec(state, record)
.await
.unwrap_or(tandem_types::ModelSpec {
provider_id: "local".to_string(),
model_id: "echo-1".to_string(),
});
let workflow_label = match record.workflow_mode {
CoderWorkflowMode::IssueTriage => "Issue Triage",
CoderWorkflowMode::IssueFix => "Issue Fix",
CoderWorkflowMode::PrReview => "PR Review",
CoderWorkflowMode::MergeRecommendation => "Merge Recommendation",
};
let session_title = format!(
"Coder {workflow_label} {} / {}",
record.coder_run_id, worker_kind
);
let managed_worktree = prepare_coder_worker_workspace(
state,
&record.repo_binding.workspace_root,
task_id,
&record.linked_context_run_id,
worker_kind,
)
.await;
let canonical_repo_root = managed_worktree
.as_ref()
.map(|result| result.record.repo_root.clone())
.or_else(|| {
crate::runtime::worktrees::resolve_git_repo_root(&record.repo_binding.workspace_root)
})
.unwrap_or_else(|| record.repo_binding.workspace_root.clone());
let worker_workspace_root = managed_worktree
.as_ref()
.map(|result| result.record.path.clone())
.unwrap_or_else(|| record.repo_binding.workspace_root.clone());
let result = async {
let mut session = Session::new(
Some(session_title),
Some(worker_workspace_root.clone()),
);
session.project_id = Some(record.repo_binding.project_id.clone());
session.workspace_root = Some(worker_workspace_root.clone());
session.environment = Some(state.host_runtime_context());
session.provider = Some(model.provider_id.clone());
session.model = Some(model.clone());
let session_id = session.id.clone();
state
.storage
.save_session(session.clone())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let worker_context_run_id =
super::context_runs::ensure_session_context_run(state, &session).await?;
let run_id = Uuid::new_v4().to_string();
let client_id = Some(record.coder_run_id.clone());
let agent_id = Some("coder_issue_fix_worker".to_string());
let tenant_context = session.tenant_context.clone();
let active_run = state
.run_registry
.acquire(
&session_id,
run_id.clone(),
client_id.clone(),
agent_id.clone(),
agent_id.clone(),
)
.await
.map_err(|_| StatusCode::CONFLICT)?;
state.event_bus.publish(EngineEvent::new(
"session.run.started",
serde_json::json!({
"sessionID": session_id,
"runID": run_id,
"startedAtMs": active_run.started_at_ms,
"clientID": active_run.client_id,
"agentID": active_run.agent_id,
"agentProfile": active_run.agent_profile,
"environment": state.host_runtime_context(),
"tenantContext": tenant_context.clone(),
}),
));
let strict_issue_fix_worker =
record.workflow_mode == CoderWorkflowMode::IssueFix && worker_kind.starts_with("issue_fix_");
let request = SendMessageRequest {
parts: vec![MessagePartInput::Text {
text: build_coder_worker_contract_prompt(
&worker_workspace_root,
&canonical_repo_root,
worker_kind,
&prompt,
),
}],
model: Some(model.clone()),
agent: agent_id.clone().or_else(|| Some(worker_kind.to_string())),
tool_mode: Some(if strict_issue_fix_worker {
tandem_types::ToolMode::Required
} else {
tandem_types::ToolMode::Auto
}),
tool_allowlist: None,
strict_kb_grounding: None,
context_mode: Some(tandem_types::ContextMode::Full),
write_required: Some(true),
prewrite_requirements: strict_issue_fix_worker.then_some(
tandem_types::PrewriteRequirements {
workspace_inspection_required: true,
concrete_read_required: true,
web_research_required: false,
successful_web_research_required: false,
repair_on_unmet_requirements: true,
repair_budget: Some(3),
repair_exhaustion_behavior: Some(
tandem_types::PrewriteRepairExhaustionBehavior::FailClosed,
),
coverage_mode: tandem_types::PrewriteCoverageMode::FilesReviewedBacked,
},
),
};
state
.engine_loop
.set_session_allowed_tools(
&session_id,
crate::normalize_allowed_tools(vec!["*".to_string()]),
)
.await;
let run_result = super::sessions::execute_run(
state.clone(),
session_id.clone(),
run_id.clone(),
request,
Some(format!("coder:{}:{worker_kind}", record.coder_run_id)),
client_id,
tenant_context.clone(),
)
.await;
state
.engine_loop
.clear_session_allowed_tools(&session_id)
.await;
let session = state
.storage
.get_session(&session_id)
.await
.ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
let assistant_text = latest_assistant_session_text(&session);
let tool_invocation_count = count_session_tool_invocations(&session);
let changed_file_entries = extract_session_change_evidence(&session);
let session_changed_files = changed_file_entries
.iter()
.filter_map(|row| {
row.get("path")
.and_then(Value::as_str)
.map(ToString::to_string)
})
.collect::<Vec<_>>();
let git_evidence = collect_git_handoff_evidence(&worker_workspace_root).await;
let mut changed_files = git_evidence
.get("changed_files")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(ToString::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default();
for path in session_changed_files {
if !changed_files.contains(&path) {
changed_files.push(path);
}
}
let payload = json!({
"coder_run_id": record.coder_run_id,
"linked_context_run_id": record.linked_context_run_id,
"workflow_mode": record.workflow_mode,
"repo_binding": record.repo_binding,
"github_ref": record.github_ref,
"worker_kind": worker_kind,
"task_id": task_id,
"worker_workspace_root": worker_workspace_root,
"worker_workspace_repo_root": canonical_repo_root,
"worker_workspace_branch": managed_worktree.as_ref().map(|row| row.record.branch.clone()),
"worker_workspace_reused": managed_worktree.as_ref().map(|row| row.reused),
"worker_workspace_cleanup_branch": managed_worktree.as_ref().map(|row| row.record.cleanup_branch),
"managed_worktree": managed_worktree.as_ref().map(|row| json!({
"key": row.record.key,
"path": row.record.path,
"repo_root": row.record.repo_root,
"branch": row.record.branch,
"base": row.record.base,
"task_id": row.record.task_id,
"owner_run_id": row.record.owner_run_id,
"lease_id": row.record.lease_id,
"cleanup_branch": row.record.cleanup_branch,
"reused": row.reused,
})),
"session_id": session_id,
"session_run_id": run_id,
"session_context_run_id": worker_context_run_id,
"worker_run_reference": worker_context_run_id,
"status": if run_result.is_ok() { "completed" } else { "error" },
"model": model,
"agent_id": agent_id,
"prompt": prompt,
"assistant_text": assistant_text,
"tool_invocation_count": tool_invocation_count,
"changed_files": changed_files,
"changed_file_entries": changed_file_entries,
"git_evidence": git_evidence,
"message_count": session.messages.len(),
"messages": compact_session_messages(&session),
"error": run_result.as_ref().err().map(|error| crate::truncate_text(&error.to_string(), 500)),
"created_at_ms": crate::now_ms(),
});
let artifact = write_coder_artifact(
state,
&record.linked_context_run_id,
&format!("{worker_kind}-worker-session-{}", Uuid::new_v4().simple()),
artifact_type,
relative_path,
&payload,
)
.await?;
publish_coder_artifact_added(state, record, &artifact, Some("analysis"), {
let mut extra = serde_json::Map::new();
extra.insert("kind".to_string(), json!("worker_session"));
if let Some(session_id) = payload.get("session_id").cloned() {
extra.insert("session_id".to_string(), session_id);
}
if let Some(session_run_id) = payload.get("session_run_id").cloned() {
extra.insert("session_run_id".to_string(), session_run_id);
}
if let Some(session_context_run_id) = payload.get("session_context_run_id").cloned() {
extra.insert("session_context_run_id".to_string(), session_context_run_id);
}
extra.insert("worker_kind".to_string(), json!(worker_kind));
if let Some(branch) = payload.get("worker_workspace_branch").cloned() {
extra.insert("worker_workspace_branch".to_string(), branch);
}
extra
});
Ok::<_, StatusCode>((artifact, payload, run_result.is_ok()))
}
.await;
let preserve_worktree = worker_kind.starts_with("issue_fix_");
if !preserve_worktree {
if let Some(worktree) = managed_worktree.as_ref() {
let _ =
crate::runtime::worktrees::delete_managed_worktree(state, &worktree.record).await;
}
}
let (artifact, payload, run_ok) = result?;
if !run_ok {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
Ok((artifact, payload))
}
async fn prepare_coder_worker_workspace(
state: &AppState,
workspace_root: &str,
task_id: Option<&str>,
owner_run_id: &str,
worker_kind: &str,
) -> Option<crate::runtime::worktrees::ManagedWorktreeEnsureResult> {
let repo_root = crate::runtime::worktrees::resolve_git_repo_root(workspace_root)?;
crate::runtime::worktrees::ensure_managed_worktree(
state,
crate::runtime::worktrees::ManagedWorktreeEnsureInput {
repo_root,
task_id: task_id.map(ToString::to_string),
owner_run_id: Some(owner_run_id.to_string()),
lease_id: None,
branch_hint: Some(worker_kind.to_string()),
base: "HEAD".to_string(),
cleanup_branch: true,
},
)
.await
.ok()
}
fn build_coder_worker_contract_prompt(
worker_workspace_root: &str,
canonical_repo_root: &str,
worker_kind: &str,
prompt: &str,
) -> String {
format!(
concat!(
"Managed worker workspace: {worker_workspace_root}\n",
"Canonical repo root: {canonical_repo_root}\n",
"Worker kind: {worker_kind}\n\n",
"Tandem coder contract:\n",
"1. Inspect the repository before deciding what to edit. Prefer fast file search and read concrete files before patching.\n",
"2. Check scoped agent instructions such as AGENTS.md when they exist in or above files you touch.\n",
"3. For code-change work, produce an actual workspace diff. Use edit/apply_patch/write tools; a prose-only answer is not complete.\n",
"4. Keep the change constrained to the issue, run targeted validation, and repair the smallest root cause if validation fails.\n",
"5. End with Summary, Changed Files, Validation, Residual Risk, and Handoff headings.\n\n",
"{prompt}"
),
worker_workspace_root = worker_workspace_root,
canonical_repo_root = canonical_repo_root,
worker_kind = worker_kind,
prompt = prompt,
)
}
async fn collect_git_handoff_evidence(workspace_root: &str) -> Value {
let workspace_root = workspace_root.to_string();
tokio::task::spawn_blocking(move || {
let status = std::process::Command::new("git")
.args(["-C", &workspace_root, "status", "--porcelain"])
.output();
let diff = std::process::Command::new("git")
.args(["-C", &workspace_root, "diff", "--", "."])
.output();
let head = std::process::Command::new("git")
.args(["-C", &workspace_root, "rev-parse", "HEAD"])
.output();
let branch = std::process::Command::new("git")
.args(["-C", &workspace_root, "branch", "--show-current"])
.output();
let status_text = status
.as_ref()
.ok()
.filter(|output| output.status.success())
.map(|output| String::from_utf8_lossy(&output.stdout).to_string())
.unwrap_or_default();
let mut changed_files = status_text
.lines()
.filter_map(|line| {
let path = line.get(3..).unwrap_or_default().trim();
if path.is_empty() {
None
} else {
Some(path.trim_matches('"').to_string())
}
})
.collect::<Vec<_>>();
changed_files.sort();
changed_files.dedup();
json!({
"workspace_root": workspace_root,
"changed_files": changed_files,
"status_porcelain": status_text,
"diff": diff
.as_ref()
.ok()
.filter(|output| output.status.success())
.map(|output| crate::truncate_text(String::from_utf8_lossy(&output.stdout).as_ref(), 48_000))
.unwrap_or_default(),
"commit_sha": head
.as_ref()
.ok()
.filter(|output| output.status.success())
.map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string()),
"branch_name": branch
.as_ref()
.ok()
.filter(|output| output.status.success())
.map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string()),
"ok": status.as_ref().map(|output| output.status.success()).unwrap_or(false),
})
})
.await
.unwrap_or_else(|error| {
json!({
"ok": false,
"error": crate::truncate_text(&error.to_string(), 500),
"changed_files": [],
})
})
}
async fn update_coder_run_handoff_from_worker(
state: &AppState,
record: &CoderRunRecord,
worker_payload: &Value,
validation_status: Option<&str>,
handoff_status: Option<&str>,
completion_gate: Option<Value>,
) -> Result<CoderRunRecord, StatusCode> {
let mut updated = load_coder_run_record(state, &record.coder_run_id).await?;
if let Some(session_id) = worker_payload
.get("session_id")
.and_then(Value::as_str)
.map(ToString::to_string)
{
updated.worker_session_id = Some(session_id);
}
if let Some(run_id) = worker_payload
.get("session_run_id")
.and_then(Value::as_str)
.map(ToString::to_string)
{
updated.worker_run_id = Some(run_id);
}
if let Some(managed_worktree) = worker_payload.get("managed_worktree").cloned() {
updated.managed_worktree = Some(managed_worktree);
}
if let Some(branch_name) = worker_payload
.get("git_evidence")
.and_then(|row| row.get("branch_name"))
.and_then(Value::as_str)
.map(ToString::to_string)
.or_else(|| {
worker_payload
.get("worker_workspace_branch")
.and_then(Value::as_str)
.map(ToString::to_string)
})
{
updated.branch_name = Some(branch_name);
}
if let Some(commit_sha) = worker_payload
.get("git_evidence")
.and_then(|row| row.get("commit_sha"))
.and_then(Value::as_str)
.map(ToString::to_string)
{
updated.commit_sha = Some(commit_sha);
}
let changed_files = worker_payload
.get("changed_files")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(ToString::to_string)
.collect::<Vec<_>>()
})
.filter(|rows| !rows.is_empty());
if changed_files.is_some() {
updated.changed_files = changed_files;
}
if let Some(validation_status) = validation_status {
updated.validation_status = Some(validation_status.to_string());
}
if let Some(handoff_status) = handoff_status {
updated.handoff_status = Some(handoff_status.to_string());
}
if let Some(completion_gate) = completion_gate {
updated.completion_gate = Some(completion_gate);
}
updated.updated_at_ms = crate::now_ms();
save_coder_run_record(state, &updated).await?;
Ok(updated)
}
fn worker_payload_has_patch(worker_payload: &Value) -> bool {
worker_payload
.get("changed_files")
.and_then(Value::as_array)
.is_some_and(|rows| !rows.is_empty())
|| worker_payload
.get("git_evidence")
.and_then(|row| row.get("diff"))
.and_then(Value::as_str)
.map(str::trim)
.is_some_and(|text| !text.is_empty())
}
async fn run_issue_fix_prepare_worker(
state: &AppState,
record: &CoderRunRecord,
run: &ContextRunState,
task_id: Option<&str>,
) -> Result<(ContextBlackboardArtifact, Value), StatusCode> {
let prompt = build_issue_fix_worker_prompt(
record,
run,
&summarize_workflow_memory_hits(record, run, "retrieve_memory"),
);
run_issue_fix_worker_session(
state,
record,
task_id,
prompt,
"issue_fix_prepare",
"coder_issue_fix_worker_session",
"artifacts/issue_fix.worker_session.json",
)
.await
}
fn build_issue_fix_validation_worker_prompt(
record: &CoderRunRecord,
run: &ContextRunState,
plan_payload: Option<&Value>,
memory_hits_used: &[String],
) -> String {
let issue_number = record
.github_ref
.as_ref()
.map(|row| row.number)
.unwrap_or_default();
let plan_summary = plan_payload
.and_then(|payload| payload.get("summary"))
.and_then(Value::as_str)
.unwrap_or("No structured fix summary was recorded.");
let fix_strategy = plan_payload
.and_then(|payload| payload.get("fix_strategy"))
.and_then(Value::as_str)
.unwrap_or("No fix strategy was recorded.");
let validation_hints = plan_payload
.and_then(|payload| payload.get("validation_steps"))
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.collect::<Vec<_>>()
.join(", ")
})
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "no explicit validation hints".to_string());
let memory_hint = if memory_hits_used.is_empty() {
"none".to_string()
} else {
memory_hits_used.join(", ")
};
format!(
concat!(
"You are the Tandem coder issue-fix validation worker.\n",
"Repository: {repo_slug}\n",
"Workspace root: {workspace_root}\n",
"Issue number: #{issue_number}\n",
"Context run ID: {context_run_id}\n",
"Fix plan summary: {plan_summary}\n",
"Fix strategy: {fix_strategy}\n",
"Validation hints: {validation_hints}\n",
"Memory hits already surfaced: {memory_hint}\n\n",
"Task:\n",
"1. Inspect the current workspace state.\n",
"2. Run or describe targeted validation for the proposed fix.\n",
"3. Report residual risks or follow-up work.\n\n",
"Return a compact response with these headings:\n",
"Summary:\n",
"Validation:\n",
"Risks:\n"
),
repo_slug = record.repo_binding.repo_slug,
workspace_root = record.repo_binding.workspace_root,
issue_number = issue_number,
context_run_id = run.run_id,
plan_summary = plan_summary,
fix_strategy = fix_strategy,
validation_hints = validation_hints,
memory_hint = memory_hint,
)
}
async fn run_issue_fix_validation_worker(
state: &AppState,
record: &CoderRunRecord,
run: &ContextRunState,
plan_payload: Option<&Value>,
task_id: Option<&str>,
) -> Result<(ContextBlackboardArtifact, Value), StatusCode> {
let prompt = build_issue_fix_validation_worker_prompt(
record,
run,
plan_payload,
&summarize_workflow_memory_hits(record, run, "retrieve_memory"),
);
run_issue_fix_worker_session(
state,
record,
task_id,
prompt,
"issue_fix_validation",
"coder_issue_fix_validation_session",
"artifacts/issue_fix.validation_session.json",
)
.await
}
async fn run_pr_review_worker(
state: &AppState,
record: &CoderRunRecord,
run: &ContextRunState,
task_id: Option<&str>,
) -> Result<(ContextBlackboardArtifact, Value), StatusCode> {
let prompt = build_pr_review_worker_prompt(
record,
run,
&summarize_workflow_memory_hits(record, run, "retrieve_memory"),
);
run_issue_fix_worker_session(
state,
record,
task_id,
prompt,
"pr_review_analysis",
"coder_pr_review_worker_session",
"artifacts/pr_review.worker_session.json",
)
.await
}
async fn run_issue_triage_worker(
state: &AppState,
record: &CoderRunRecord,
run: &ContextRunState,
task_id: Option<&str>,
) -> Result<(ContextBlackboardArtifact, Value), StatusCode> {
let prompt = build_issue_triage_worker_prompt(
record,
run,
&summarize_workflow_memory_hits(record, run, "retrieve_memory"),
);
run_issue_fix_worker_session(
state,
record,
task_id,
prompt,
"issue_triage_analysis",
"coder_issue_triage_worker_session",
"artifacts/triage.worker_session.json",
)
.await
}
async fn run_merge_recommendation_worker(
state: &AppState,
record: &CoderRunRecord,
run: &ContextRunState,
task_id: Option<&str>,
) -> Result<(ContextBlackboardArtifact, Value), StatusCode> {
let prompt = build_merge_recommendation_worker_prompt(
record,
run,
&summarize_workflow_memory_hits(record, run, "retrieve_memory"),
);
run_issue_fix_worker_session(
state,
record,
task_id,
prompt,
"merge_recommendation_analysis",
"coder_merge_recommendation_worker_session",
"artifacts/merge_recommendation.worker_session.json",
)
.await
}
fn coder_run_payload(record: &CoderRunRecord, context_run: &ContextRunState) -> Value {
json!({
"coder_run_id": record.coder_run_id,
"workflow_mode": record.workflow_mode,
"linked_context_run_id": record.linked_context_run_id,
"repo_binding": record.repo_binding,
"github_ref": record.github_ref,
"source_client": record.source_client,
"model_provider": record.model_provider,
"model_id": record.model_id,
"parent_coder_run_id": record.parent_coder_run_id,
"origin": record.origin,
"origin_artifact_type": record.origin_artifact_type,
"origin_policy": record.origin_policy,
"github_project_ref": record.github_project_ref,
"remote_sync_state": coder_run_sync_state(record),
"worker_session_id": record.worker_session_id,
"worker_run_id": record.worker_run_id,
"managed_worktree": record.managed_worktree,
"branch_name": record.branch_name,
"commit_sha": record.commit_sha,
"pr_url": record.pr_url,
"changed_files": record.changed_files,
"validation_status": record.validation_status,
"handoff_status": record.handoff_status,
"completion_gate": record.completion_gate,
"status": context_run.status,
"phase": project_coder_phase(context_run),
"created_at_ms": record.created_at_ms,
"updated_at_ms": context_run.updated_at_ms,
})
}
fn same_coder_github_ref(left: Option<&CoderGithubRef>, right: Option<&CoderGithubRef>) -> bool {
match (left, right) {
(Some(left), Some(right)) => left.kind == right.kind && left.number == right.number,
(None, None) => true,
_ => false,
}
}
async fn has_completed_follow_on_pr_review(
state: &AppState,
record: &CoderRunRecord,
) -> Result<bool, StatusCode> {
Ok(find_completed_follow_on_pr_review(state, record)
.await?
.is_some())
}
async fn find_completed_follow_on_pr_review(
state: &AppState,
record: &CoderRunRecord,
) -> Result<Option<CoderRunRecord>, StatusCode> {
let Some(parent_coder_run_id) = record.parent_coder_run_id.as_deref() else {
return Ok(None);
};
let mut latest_completed: Option<(CoderRunRecord, u64)> = None;
ensure_coder_runs_dir(state).await?;
let mut dir = tokio::fs::read_dir(coder_runs_root(state))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
while let Ok(Some(entry)) = dir.next_entry().await {
if !entry
.file_type()
.await
.map(|row| row.is_file())
.unwrap_or(false)
{
continue;
}
let raw = tokio::fs::read_to_string(entry.path())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let Ok(candidate) = serde_json::from_str::<CoderRunRecord>(&raw) else {
continue;
};
if candidate.coder_run_id == record.coder_run_id
|| candidate.parent_coder_run_id.as_deref() != Some(parent_coder_run_id)
|| candidate.workflow_mode != CoderWorkflowMode::PrReview
|| !same_coder_github_ref(candidate.github_ref.as_ref(), record.github_ref.as_ref())
{
continue;
}
let Ok(run) = load_context_run_state(state, &candidate.linked_context_run_id).await else {
continue;
};
if matches!(run.status, ContextRunStatus::Completed) {
let candidate_updated_at = run.updated_at_ms;
if latest_completed
.as_ref()
.is_none_or(|(_, best_updated_at)| candidate_updated_at >= *best_updated_at)
{
latest_completed = Some((candidate, candidate_updated_at));
}
}
}
Ok(latest_completed.map(|(record, _)| record))
}
async fn merge_submit_review_policy_block(
state: &AppState,
record: &CoderRunRecord,
) -> Result<Option<Value>, StatusCode> {
let source = record
.origin_policy
.as_ref()
.and_then(|row| row.get("source"))
.and_then(Value::as_str);
if source != Some("issue_fix_pr_submit") {
return Ok(None);
}
let Some(review_record) = find_completed_follow_on_pr_review(state, record).await? else {
return Ok(Some(json!({
"reason": "requires_approved_pr_review_follow_on",
"required_workflow_mode": "pr_review",
"parent_coder_run_id": record.parent_coder_run_id,
"review_completed": false,
})));
};
let Some(review_summary) =
load_latest_coder_artifact_payload(state, &review_record, "coder_pr_review_summary").await
else {
return Ok(Some(json!({
"reason": "requires_approved_pr_review_follow_on",
"required_workflow_mode": "pr_review",
"parent_coder_run_id": record.parent_coder_run_id,
"review_completed": true,
"review_summary_present": false,
})));
};
let verdict = review_summary
.get("verdict")
.and_then(Value::as_str)
.map(str::trim)
.unwrap_or_default()
.to_ascii_lowercase();
let has_blockers = review_summary
.get("blockers")
.and_then(Value::as_array)
.is_some_and(|rows| !rows.is_empty());
let has_requested_changes = review_summary
.get("requested_changes")
.and_then(Value::as_array)
.is_some_and(|rows| !rows.is_empty());
if verdict == "approve" && !has_blockers && !has_requested_changes {
return Ok(None);
}
Ok(Some(json!({
"reason": "requires_approved_pr_review_follow_on",
"required_workflow_mode": "pr_review",
"parent_coder_run_id": record.parent_coder_run_id,
"review_completed": true,
"review_summary_present": true,
"review_verdict": review_summary.get("verdict").cloned().unwrap_or(Value::Null),
"has_blockers": has_blockers,
"has_requested_changes": has_requested_changes,
})))
}
fn merge_submit_auto_mode_policy_block(record: &CoderRunRecord) -> Option<Value> {
let origin_policy = record.origin_policy.as_ref();
let merge_auto_spawn_opted_in = origin_policy
.and_then(|row| row.get("merge_auto_spawn_opted_in"))
.and_then(Value::as_bool)
.unwrap_or(false);
if !merge_auto_spawn_opted_in {
return Some(json!({
"reason": "requires_explicit_auto_merge_submit_opt_in",
"submit_mode": "auto",
"merge_auto_spawn_opted_in": false,
}));
}
let spawn_mode = origin_policy
.and_then(|row| row.get("spawn_mode"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown");
if spawn_mode != "auto" {
return Some(json!({
"reason": "requires_auto_spawned_merge_follow_on",
"submit_mode": "auto",
"merge_auto_spawn_opted_in": true,
"spawn_mode": spawn_mode,
}));
}
None
}
fn merge_submit_request_readiness_block(merge_request_payload: &Value) -> Option<Value> {
let recommendation = merge_request_payload
.get("recommendation")
.and_then(Value::as_str)
.map(str::trim)
.unwrap_or_default()
.to_ascii_lowercase();
let has_blockers = merge_request_payload
.get("blockers")
.and_then(Value::as_array)
.is_some_and(|rows| !rows.is_empty());
let has_required_checks = merge_request_payload
.get("required_checks")
.and_then(Value::as_array)
.is_some_and(|rows| !rows.is_empty());
let has_required_approvals = merge_request_payload
.get("required_approvals")
.and_then(Value::as_array)
.is_some_and(|rows| !rows.is_empty());
if recommendation == "merge" && !has_blockers && !has_required_checks && !has_required_approvals
{
return None;
}
Some(json!({
"reason": "merge_execution_request_not_merge_ready",
"recommendation": merge_request_payload.get("recommendation").cloned().unwrap_or(Value::Null),
"has_blockers": has_blockers,
"has_required_checks": has_required_checks,
"has_required_approvals": has_required_approvals,
}))
}
fn blocked_merge_submit_policy(mode: &str, policy: Value) -> Value {
json!({
"blocked": true,
"code": "CODER_MERGE_SUBMIT_POLICY_BLOCKED",
"submit_mode": mode,
"policy": policy,
})
}
fn allowed_merge_submit_policy(mode: &str) -> Value {
json!({
"blocked": false,
"submit_mode": mode,
"eligible": true,
})
}
fn merge_submit_policy_envelope(
manual: Value,
auto: Value,
preferred_submit_mode: &str,
auto_execute_eligible: bool,
auto_execute_policy_enabled: bool,
auto_execute_block_reason: &str,
) -> Value {
json!({
"manual": manual,
"auto": auto,
"preferred_submit_mode": preferred_submit_mode,
"explicit_submit_required": true,
"auto_execute_after_approval": false,
"auto_execute_eligible": auto_execute_eligible,
"auto_execute_policy_enabled": auto_execute_policy_enabled,
"auto_execute_block_reason": auto_execute_block_reason,
})
}
fn blocked_policy_reason(policy: &Value) -> Option<&str> {
policy.get("reason").and_then(Value::as_str).or_else(|| {
policy
.get("policy")
.and_then(|row| row.get("reason"))
.and_then(Value::as_str)
})
}
async fn coder_merge_submit_policy_summary(
state: &AppState,
record: &CoderRunRecord,
) -> Result<Value, StatusCode> {
if record.workflow_mode != CoderWorkflowMode::MergeRecommendation {
return Ok(Value::Null);
}
let project_policy = load_coder_project_policy(state, &record.repo_binding.project_id).await?;
let Some(merge_request_payload) =
load_latest_coder_artifact_payload(state, record, "coder_merge_execution_request").await
else {
return Ok(merge_submit_policy_envelope(
blocked_merge_submit_policy(
"manual",
json!({
"reason": "requires_merge_execution_request",
}),
),
blocked_merge_submit_policy(
"auto",
json!({
"reason": "requires_merge_execution_request",
"merge_auto_spawn_opted_in": record
.origin_policy
.as_ref()
.and_then(|row| row.get("merge_auto_spawn_opted_in"))
.cloned()
.unwrap_or_else(|| json!(false)),
}),
),
"manual",
false,
project_policy.auto_merge_enabled,
"requires_merge_execution_request",
));
};
if let Some(policy) = merge_submit_request_readiness_block(&merge_request_payload) {
let block_reason = blocked_policy_reason(&policy)
.unwrap_or("merge_submit_blocked")
.to_string();
return Ok(merge_submit_policy_envelope(
blocked_merge_submit_policy("manual", policy.clone()),
blocked_merge_submit_policy("auto", policy),
"manual",
false,
project_policy.auto_merge_enabled,
&block_reason,
));
}
if let Some(policy) = merge_submit_review_policy_block(state, record).await? {
let auto_policy =
merge_submit_auto_mode_policy_block(record).unwrap_or_else(|| policy.clone());
let block_reason = blocked_policy_reason(&policy)
.unwrap_or("merge_submit_blocked")
.to_string();
return Ok(merge_submit_policy_envelope(
blocked_merge_submit_policy("manual", policy),
blocked_merge_submit_policy("auto", auto_policy),
"manual",
false,
project_policy.auto_merge_enabled,
&block_reason,
));
}
let auto = if let Some(policy) = merge_submit_auto_mode_policy_block(record) {
blocked_merge_submit_policy("auto", policy)
} else {
allowed_merge_submit_policy("auto")
};
let preferred_submit_mode = if auto
.get("blocked")
.and_then(Value::as_bool)
.unwrap_or(false)
{
"manual"
} else {
"auto"
};
let auto_execute_eligible =
project_policy.auto_merge_enabled && preferred_submit_mode == "auto";
let auto_execute_block_reason = if !project_policy.auto_merge_enabled {
"project_auto_merge_policy_disabled".to_string()
} else if preferred_submit_mode == "manual" {
blocked_policy_reason(&auto)
.unwrap_or("preferred_submit_mode_manual")
.to_string()
} else {
"explicit_submit_required_policy".to_string()
};
Ok(merge_submit_policy_envelope(
allowed_merge_submit_policy("manual"),
auto,
preferred_submit_mode,
auto_execute_eligible,
project_policy.auto_merge_enabled,
&auto_execute_block_reason,
))
}
async fn coder_execution_policy_block(
state: &AppState,
record: &CoderRunRecord,
) -> Result<Option<Value>, StatusCode> {
if record.workflow_mode != CoderWorkflowMode::MergeRecommendation {
return Ok(None);
}
let source = record
.origin_policy
.as_ref()
.and_then(|row| row.get("source"))
.and_then(Value::as_str);
if source != Some("issue_fix_pr_submit") {
return Ok(None);
}
if has_completed_follow_on_pr_review(state, record).await? {
return Ok(None);
}
Ok(Some(json!({
"ok": false,
"error": "merge recommendation is blocked until a sibling pr_review run completes",
"code": "CODER_EXECUTION_POLICY_BLOCKED",
"policy": {
"reason": "requires_completed_pr_review_follow_on",
"required_workflow_mode": "pr_review",
"parent_coder_run_id": record.parent_coder_run_id,
}
})))
}
async fn coder_execution_policy_summary(
state: &AppState,
record: &CoderRunRecord,
) -> Result<Value, StatusCode> {
if let Some(blocked) = coder_execution_policy_block(state, record).await? {
let policy = blocked.get("policy").cloned().unwrap_or_else(|| json!({}));
return Ok(json!({
"blocked": true,
"code": blocked.get("code").cloned().unwrap_or_else(|| json!("CODER_EXECUTION_POLICY_BLOCKED")),
"error": blocked.get("error").cloned().unwrap_or_else(|| json!("coder execution blocked by policy")),
"policy": policy,
}));
}
Ok(json!({
"blocked": false,
}))
}
async fn emit_coder_execution_policy_block(
state: &AppState,
record: &CoderRunRecord,
blocked: &Value,
) -> Result<(), StatusCode> {
publish_coder_run_event(
state,
"coder.run.phase_changed",
record,
Some("policy_blocked"),
{
let mut extra = serde_json::Map::new();
extra.insert("event_type".to_string(), json!("execution_policy_blocked"));
extra.insert(
"code".to_string(),
blocked
.get("code")
.cloned()
.unwrap_or_else(|| json!("CODER_EXECUTION_POLICY_BLOCKED")),
);
extra.insert(
"policy".to_string(),
blocked.get("policy").cloned().unwrap_or_else(|| json!({})),
);
extra
},
);
Ok(())
}
fn follow_on_execution_policy_preview(
workflow_mode: &CoderWorkflowMode,
required_completed_workflow_modes: &[Value],
) -> Value {
if matches!(workflow_mode, CoderWorkflowMode::MergeRecommendation)
&& !required_completed_workflow_modes.is_empty()
{
return json!({
"blocked": true,
"code": "CODER_EXECUTION_POLICY_BLOCKED",
"error": "merge recommendation is blocked until required review follow-ons complete",
"policy": {
"reason": "requires_completed_pr_review_follow_on",
"required_completed_workflow_modes": required_completed_workflow_modes,
}
});
}
json!({
"blocked": false,
})
}
async fn coder_run_create_inner(
state: AppState,
input: CoderRunCreateInput,
) -> Result<Response, StatusCode> {
if input.repo_binding.project_id.trim().is_empty()
|| input.repo_binding.workspace_id.trim().is_empty()
|| input.repo_binding.workspace_root.trim().is_empty()
|| input.repo_binding.repo_slug.trim().is_empty()
{
return Err(StatusCode::BAD_REQUEST);
}
if matches!(input.workflow_mode, CoderWorkflowMode::IssueTriage)
&& !matches!(
input.github_ref.as_ref().map(|row| &row.kind),
Some(CoderGithubRefKind::Issue)
)
{
return Err(StatusCode::BAD_REQUEST);
}
if matches!(input.workflow_mode, CoderWorkflowMode::IssueFix)
&& !matches!(
input.github_ref.as_ref().map(|row| &row.kind),
Some(CoderGithubRefKind::Issue)
)
{
return Err(StatusCode::BAD_REQUEST);
}
if matches!(input.workflow_mode, CoderWorkflowMode::PrReview)
&& !matches!(
input.github_ref.as_ref().map(|row| &row.kind),
Some(CoderGithubRefKind::PullRequest)
)
{
return Err(StatusCode::BAD_REQUEST);
}
if matches!(input.workflow_mode, CoderWorkflowMode::MergeRecommendation)
&& !matches!(
input.github_ref.as_ref().map(|row| &row.kind),
Some(CoderGithubRefKind::PullRequest)
)
{
return Err(StatusCode::BAD_REQUEST);
}
if matches!(
input.workflow_mode,
CoderWorkflowMode::IssueTriage | CoderWorkflowMode::IssueFix
) {
let readiness = coder_issue_triage_readiness(&state, &input).await?;
if !readiness.runnable {
return Ok((
StatusCode::CONFLICT,
Json(json!({
"error": if matches!(input.workflow_mode, CoderWorkflowMode::IssueFix) {
"Coder issue fix is not ready to run"
} else {
"Coder issue triage is not ready to run"
},
"code": "CODER_READINESS_BLOCKED",
"readiness": readiness,
})),
)
.into_response());
}
}
if matches!(input.workflow_mode, CoderWorkflowMode::PrReview) {
let readiness = coder_pr_review_readiness(&state, &input).await?;
if !readiness.runnable {
return Ok((
StatusCode::CONFLICT,
Json(json!({
"error": "Coder PR review is not ready to run",
"code": "CODER_READINESS_BLOCKED",
"readiness": readiness,
})),
)
.into_response());
}
}
if matches!(input.workflow_mode, CoderWorkflowMode::MergeRecommendation) {
let readiness = coder_merge_recommendation_readiness(&state, &input).await?;
if !readiness.runnable {
return Ok((
StatusCode::CONFLICT,
Json(json!({
"error": "Coder merge recommendation is not ready to run",
"code": "CODER_READINESS_BLOCKED",
"readiness": readiness,
})),
)
.into_response());
}
}
let now = crate::now_ms();
let coder_run_id = input
.coder_run_id
.clone()
.unwrap_or_else(|| format!("coder-{}", Uuid::new_v4().simple()));
let linked_context_run_id = format!("ctx-{coder_run_id}");
let create_input = ContextRunCreateInput {
run_id: Some(linked_context_run_id.clone()),
objective: match input.workflow_mode {
CoderWorkflowMode::IssueTriage => compose_issue_triage_objective(&input),
CoderWorkflowMode::IssueFix => compose_issue_fix_objective(&input),
CoderWorkflowMode::PrReview => compose_pr_review_objective(&input),
CoderWorkflowMode::MergeRecommendation => {
compose_merge_recommendation_objective(&input)
}
},
run_type: Some(input.workflow_mode.as_context_run_type().to_string()),
workspace: Some(derive_workspace(&input)),
source_client: normalize_source_client(input.source_client.as_deref())
.or_else(|| Some("coder_api".to_string())),
model_provider: normalize_source_client(input.model_provider.as_deref()),
model_id: normalize_source_client(input.model_id.as_deref()),
mcp_servers: input.mcp_servers.clone(),
};
let created = super::context_runs::context_run_create_impl(
state.clone(),
tandem_types::TenantContext::local_implicit(),
create_input,
)
.await?;
let _context_run: ContextRunState =
serde_json::from_value(created.0.get("run").cloned().unwrap_or_default())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut record = CoderRunRecord {
coder_run_id: coder_run_id.clone(),
workflow_mode: input.workflow_mode.clone(),
linked_context_run_id: linked_context_run_id.clone(),
repo_binding: input.repo_binding,
github_ref: input.github_ref,
source_client: normalize_source_client(input.source_client.as_deref())
.or_else(|| Some("coder_api".to_string())),
model_provider: normalize_source_client(input.model_provider.as_deref()),
model_id: normalize_source_client(input.model_id.as_deref()),
parent_coder_run_id: input.parent_coder_run_id,
origin: normalize_source_client(input.origin.as_deref()),
origin_artifact_type: normalize_source_client(input.origin_artifact_type.as_deref()),
origin_policy: input.origin_policy,
github_project_ref: None,
remote_sync_state: None,
worker_session_id: None,
worker_run_id: None,
managed_worktree: None,
branch_name: None,
commit_sha: None,
pr_url: None,
changed_files: None,
validation_status: None,
handoff_status: None,
completion_gate: None,
created_at_ms: now,
updated_at_ms: now,
};
save_coder_run_record(&state, &record).await?;
let follow_on_duplicate_linkage =
maybe_write_follow_on_duplicate_linkage_candidate(&state, &record).await?;
match record.workflow_mode {
CoderWorkflowMode::IssueTriage => {
seed_issue_triage_tasks(state.clone(), &record).await?;
let memory_query = format!(
"{} issue #{}",
record.repo_binding.repo_slug,
record
.github_ref
.as_ref()
.map(|row| row.number)
.unwrap_or_default()
);
let memory_hits = collect_coder_memory_hits(&state, &record, &memory_query, 8).await?;
let duplicate_matches = derive_failure_pattern_duplicate_matches(&memory_hits, None, 3);
let artifact_id = format!("memory-hits-{}", Uuid::new_v4().simple());
let payload = json!({
"coder_run_id": record.coder_run_id,
"linked_context_run_id": record.linked_context_run_id,
"query": memory_query,
"hits": memory_hits,
"duplicate_candidates": duplicate_matches,
"created_at_ms": crate::now_ms(),
});
let artifact = write_coder_artifact(
&state,
&record.linked_context_run_id,
&artifact_id,
"coder_memory_hits",
"artifacts/memory_hits.json",
&payload,
)
.await?;
publish_coder_artifact_added(&state, &record, &artifact, Some("memory_retrieval"), {
let mut extra = serde_json::Map::new();
extra.insert("kind".to_string(), json!("memory_hits"));
extra.insert("query".to_string(), json!(memory_query));
extra
});
if !duplicate_matches.is_empty() {
let duplicate_artifact = write_coder_artifact(
&state,
&record.linked_context_run_id,
&format!("duplicate-matches-{}", Uuid::new_v4().simple()),
"coder_duplicate_matches",
"artifacts/duplicate_matches.json",
&json!({
"coder_run_id": record.coder_run_id,
"linked_context_run_id": record.linked_context_run_id,
"query": memory_query,
"matches": duplicate_matches,
"created_at_ms": crate::now_ms(),
}),
)
.await?;
publish_coder_artifact_added(
&state,
&record,
&duplicate_artifact,
Some("memory_retrieval"),
{
let mut extra = serde_json::Map::new();
extra.insert("kind".to_string(), json!("duplicate_matches"));
extra.insert("query".to_string(), json!(memory_query));
extra
},
);
}
let run = bootstrap_coder_workflow_run(
&state,
&record,
&["ingest_reference", "retrieve_memory"],
&["inspect_repo"],
"Inspect the repo, then attempt reproduction.",
)
.await?;
record.updated_at_ms = run.updated_at_ms;
save_coder_run_record(&state, &record).await?;
}
CoderWorkflowMode::IssueFix => {
seed_issue_fix_tasks(state.clone(), &record).await?;
let memory_query = default_coder_memory_query(&record);
let memory_hits = collect_coder_memory_hits(&state, &record, &memory_query, 8).await?;
let artifact = write_coder_artifact(
&state,
&record.linked_context_run_id,
&format!("issue-fix-memory-hits-{}", Uuid::new_v4().simple()),
"coder_memory_hits",
"artifacts/memory_hits.json",
&json!({
"coder_run_id": record.coder_run_id,
"linked_context_run_id": record.linked_context_run_id,
"query": memory_query,
"hits": memory_hits,
"created_at_ms": crate::now_ms(),
}),
)
.await?;
publish_coder_artifact_added(&state, &record, &artifact, Some("memory_retrieval"), {
let mut extra = serde_json::Map::new();
extra.insert("kind".to_string(), json!("memory_hits"));
extra.insert(
"query".to_string(),
json!(default_coder_memory_query(&record)),
);
extra
});
let run = bootstrap_coder_workflow_run(
&state,
&record,
&["retrieve_memory"],
&[],
"Inspect the issue context, then prepare and validate a constrained patch.",
)
.await?;
record.updated_at_ms = run.updated_at_ms;
save_coder_run_record(&state, &record).await?;
}
CoderWorkflowMode::PrReview => {
seed_pr_review_tasks(state.clone(), &record).await?;
let memory_query = default_coder_memory_query(&record);
let memory_hits = collect_coder_memory_hits(&state, &record, &memory_query, 8).await?;
let artifact = write_coder_artifact(
&state,
&record.linked_context_run_id,
&format!("pr-review-memory-hits-{}", Uuid::new_v4().simple()),
"coder_memory_hits",
"artifacts/memory_hits.json",
&json!({
"coder_run_id": record.coder_run_id,
"linked_context_run_id": record.linked_context_run_id,
"query": memory_query,
"hits": memory_hits,
"created_at_ms": crate::now_ms(),
}),
)
.await?;
publish_coder_artifact_added(&state, &record, &artifact, Some("memory_retrieval"), {
let mut extra = serde_json::Map::new();
extra.insert("kind".to_string(), json!("memory_hits"));
extra.insert(
"query".to_string(),
json!(default_coder_memory_query(&record)),
);
extra
});
let run = bootstrap_coder_workflow_run(
&state,
&record,
&["retrieve_memory"],
&[],
"Inspect the pull request, then analyze risk and requested changes.",
)
.await?;
record.updated_at_ms = run.updated_at_ms;
save_coder_run_record(&state, &record).await?;
}
CoderWorkflowMode::MergeRecommendation => {
seed_merge_recommendation_tasks(state.clone(), &record).await?;
let memory_query = default_coder_memory_query(&record);
let memory_hits = collect_coder_memory_hits(&state, &record, &memory_query, 8).await?;
let artifact = write_coder_artifact(
&state,
&record.linked_context_run_id,
&format!(
"merge-recommendation-memory-hits-{}",
Uuid::new_v4().simple()
),
"coder_memory_hits",
"artifacts/memory_hits.json",
&json!({
"coder_run_id": record.coder_run_id,
"linked_context_run_id": record.linked_context_run_id,
"query": memory_query,
"hits": memory_hits,
"created_at_ms": crate::now_ms(),
}),
)
.await?;
publish_coder_artifact_added(&state, &record, &artifact, Some("memory_retrieval"), {
let mut extra = serde_json::Map::new();
extra.insert("kind".to_string(), json!("memory_hits"));
extra.insert(
"query".to_string(),
json!(default_coder_memory_query(&record)),
);
extra
});
let run = bootstrap_coder_workflow_run(
&state,
&record,
&["retrieve_memory"],
&[],
"Inspect the pull request, then assess merge readiness.",
)
.await?;
record.updated_at_ms = run.updated_at_ms;
save_coder_run_record(&state, &record).await?;
}
}
let final_run = load_context_run_state(&state, &linked_context_run_id).await?;
maybe_sync_github_project_status(&state, &mut record, &final_run).await?;
publish_coder_run_event(
&state,
"coder.run.created",
&record,
Some(project_coder_phase(&final_run)),
serde_json::Map::new(),
);
Ok(Json(json!({
"ok": true,
"coder_run": coder_run_payload(&record, &final_run),
"generated_candidates": follow_on_duplicate_linkage
.map(|candidate| vec![candidate])
.unwrap_or_default(),
"execution_policy": coder_execution_policy_summary(&state, &record).await?,
"merge_submit_policy": coder_merge_submit_policy_summary(&state, &record).await?,
"run": final_run,
}))
.into_response())
}
pub(super) async fn coder_run_create(
State(state): State<AppState>,
Json(input): Json<CoderRunCreateInput>,
) -> Result<Response, StatusCode> {
coder_run_create_inner(state, input).await
}