1use std::fs::{self, File, OpenOptions};
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use claude_agent_sdk_rs::{ClaudeClient, ContentBlock, Message, ResultMessage, ToolResultContent};
15use coda_pm::PromptManager;
16use futures::StreamExt;
17use serde::{Deserialize, Serialize};
18use tokio::sync::mpsc::UnboundedSender;
19use tracing::{debug, error, info, warn};
20
21use crate::CoreError;
22use crate::config::CodaConfig;
23use crate::gh::GhOps;
24use crate::git::GitOps;
25use crate::parser::{
26 extract_pr_number, extract_pr_url, parse_review_issues, parse_verification_result,
27};
28use crate::profile::AgentProfile;
29use crate::state::{FeatureState, FeatureStatus, PhaseKind, PhaseStatus};
30use crate::task::{Task, TaskResult, TaskStatus};
31
32#[derive(Debug, Clone)]
37#[non_exhaustive]
38pub enum RunEvent {
39 RunStarting {
41 phases: Vec<String>,
43 },
44 PhaseStarting {
46 name: String,
48 index: usize,
50 total: usize,
52 },
53 PhaseCompleted {
55 name: String,
57 index: usize,
59 duration: Duration,
61 turns: u32,
63 cost_usd: f64,
65 },
66 PhaseFailed {
68 name: String,
70 index: usize,
72 error: String,
74 },
75 ReviewRound {
77 round: u32,
79 max_rounds: u32,
81 issues_found: u32,
83 },
84 VerifyAttempt {
86 attempt: u32,
88 max_attempts: u32,
90 passed: bool,
92 },
93 CreatingPr,
95 PrCreated {
97 url: Option<String>,
99 },
100 RunFinished {
102 success: bool,
104 },
105}
106
107#[derive(Debug)]
112pub struct RunProgress {
113 pub results: Vec<TaskResult>,
115 pub success: bool,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct CommitInfo {
122 pub sha: String,
124 pub message: String,
126}
127
128#[derive(Debug, Clone, Default, Serialize, Deserialize)]
130pub struct ReviewSummary {
131 pub rounds: u32,
133 pub issues_found: u32,
135 pub issues_resolved: u32,
137}
138
139#[derive(Debug, Clone, Default, Serialize, Deserialize)]
141pub struct VerificationSummary {
142 pub checks_passed: u32,
144 pub checks_total: u32,
146}
147
148#[derive(Debug, Clone, Copy, Default)]
153struct IncrementalMetrics {
154 cost_usd: f64,
156 input_tokens: u64,
158 output_tokens: u64,
160}
161
162#[derive(Debug, Default)]
168struct MetricsTracker {
169 cumulative_cost_usd: f64,
171 cumulative_input_tokens: u64,
173 cumulative_output_tokens: u64,
175}
176
177impl MetricsTracker {
178 fn record(&mut self, result: &Option<ResultMessage>) -> IncrementalMetrics {
180 let new_cost = result
181 .as_ref()
182 .and_then(|r| r.total_cost_usd)
183 .unwrap_or(self.cumulative_cost_usd);
184 let cost_delta = (new_cost - self.cumulative_cost_usd).max(0.0);
185 self.cumulative_cost_usd = new_cost;
186
187 let (new_input, new_output) = result
188 .as_ref()
189 .and_then(|r| r.usage.as_ref())
190 .map(|u| {
191 let input = u.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
192 let output = u.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
193 (input, output)
194 })
195 .unwrap_or((self.cumulative_input_tokens, self.cumulative_output_tokens));
196
197 let input_delta = new_input.saturating_sub(self.cumulative_input_tokens);
198 let output_delta = new_output.saturating_sub(self.cumulative_output_tokens);
199 self.cumulative_input_tokens = new_input;
200 self.cumulative_output_tokens = new_output;
201
202 IncrementalMetrics {
203 cost_usd: cost_delta,
204 input_tokens: input_delta,
205 output_tokens: output_delta,
206 }
207 }
208}
209
210#[derive(Debug)]
216struct PhaseOutcome {
217 turns: u32,
219 cost_usd: f64,
221 input_tokens: u64,
223 output_tokens: u64,
225 duration: Duration,
227 details: serde_json::Value,
229}
230
231#[derive(Debug)]
236struct PhaseMetricsAccumulator {
237 start: Instant,
239 turns: u32,
241 cost_usd: f64,
243 input_tokens: u64,
245 output_tokens: u64,
247}
248
249impl PhaseMetricsAccumulator {
250 fn new() -> Self {
252 Self {
253 start: Instant::now(),
254 turns: 0,
255 cost_usd: 0.0,
256 input_tokens: 0,
257 output_tokens: 0,
258 }
259 }
260
261 fn record(&mut self, resp: &AgentResponse, metrics: IncrementalMetrics) {
263 self.turns += resp.result.as_ref().map_or(1, |r| r.num_turns);
264 self.cost_usd += metrics.cost_usd;
265 self.input_tokens += metrics.input_tokens;
266 self.output_tokens += metrics.output_tokens;
267 }
268
269 fn into_outcome(self, details: serde_json::Value) -> PhaseOutcome {
271 PhaseOutcome {
272 turns: self.turns,
273 cost_usd: self.cost_usd,
274 input_tokens: self.input_tokens,
275 output_tokens: self.output_tokens,
276 duration: self.start.elapsed(),
277 details,
278 }
279 }
280}
281
282#[derive(Debug, Default)]
287struct AgentResponse {
288 text: String,
290 tool_output: String,
292 result: Option<ResultMessage>,
294}
295
296impl AgentResponse {
297 fn all_text(&self) -> String {
299 if self.tool_output.is_empty() {
300 self.text.clone()
301 } else {
302 format!("{}\n{}", self.text, self.tool_output)
303 }
304 }
305}
306
307struct RunLogger {
313 file: File,
314}
315
316impl RunLogger {
317 fn new(feature_dir: &Path) -> Option<Self> {
322 let logs_dir = feature_dir.join("logs");
323 if let Err(e) = fs::create_dir_all(&logs_dir) {
324 warn!(error = %e, "Cannot create logs directory");
325 return None;
326 }
327
328 let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%S");
329 let log_path = logs_dir.join(format!("run-{timestamp}.log"));
330
331 match OpenOptions::new().create(true).append(true).open(&log_path) {
332 Ok(file) => {
333 info!(path = %log_path.display(), "Run log opened");
334 Some(Self { file })
335 }
336 Err(e) => {
337 warn!(error = %e, "Cannot open run log file");
338 None
339 }
340 }
341 }
342
343 fn log_header(&mut self, feature_slug: &str, model: &str, phases: &[String]) {
345 let _ = writeln!(self.file, "═══ CODA Run: {feature_slug} ═══");
346 let _ = writeln!(
347 self.file,
348 "Started: {}",
349 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ")
350 );
351 let _ = writeln!(self.file, "Model: {model}");
352 let _ = writeln!(self.file, "Phases: {}", phases.join(" → "));
353 let _ = writeln!(self.file);
354 }
355
356 fn log_phase_start(&mut self, name: &str, index: usize, total: usize, kind: &str) {
358 let _ = writeln!(self.file, "────────────────────────────────");
359 let _ = writeln!(self.file, "Phase {}/{total}: {name} [{kind}]", index + 1,);
360 let _ = writeln!(self.file, "────────────────────────────────");
361 let _ = writeln!(self.file);
362 }
363
364 fn log_interaction(
366 &mut self,
367 prompt: &str,
368 resp: &AgentResponse,
369 metrics: &IncrementalMetrics,
370 ) {
371 let _ = writeln!(self.file, ">>> PROMPT ({} chars)", prompt.len());
372 let truncated_prompt = truncate_for_log(prompt, LOG_TEXT_LIMIT);
373 let _ = writeln!(self.file, "{truncated_prompt}");
374 if prompt.len() > LOG_TEXT_LIMIT {
375 let _ = writeln!(self.file, "... [truncated at {LOG_TEXT_LIMIT} chars]");
376 }
377 let _ = writeln!(self.file);
378
379 let _ = writeln!(
380 self.file,
381 "<<< RESPONSE (text: {} chars, tool_output: {} chars)",
382 resp.text.len(),
383 resp.tool_output.len(),
384 );
385
386 if resp.text.is_empty() && resp.tool_output.is_empty() {
387 let _ = writeln!(self.file, "⚠ WARNING: Empty response from agent");
388 } else {
389 if !resp.text.is_empty() {
390 let _ = writeln!(self.file, "[text]");
391 let truncated = truncate_for_log(&resp.text, LOG_TEXT_LIMIT);
392 let _ = writeln!(self.file, "{truncated}");
393 if resp.text.len() > LOG_TEXT_LIMIT {
394 let _ = writeln!(self.file, "... [truncated at {LOG_TEXT_LIMIT} chars]");
395 }
396 }
397 if !resp.tool_output.is_empty() {
398 let _ = writeln!(self.file, "[tool_output]");
399 let truncated = truncate_for_log(&resp.tool_output, LOG_TEXT_LIMIT);
400 let _ = writeln!(self.file, "{truncated}");
401 if resp.tool_output.len() > LOG_TEXT_LIMIT {
402 let _ = writeln!(self.file, "... [truncated at {LOG_TEXT_LIMIT} chars]");
403 }
404 }
405 }
406
407 let _ = writeln!(
408 self.file,
409 "[metrics] turns={}, cost=${:.4}, input_tokens={}, output_tokens={}",
410 resp.result.as_ref().map_or(0, |r| r.num_turns),
411 metrics.cost_usd,
412 metrics.input_tokens,
413 metrics.output_tokens,
414 );
415 let _ = writeln!(self.file);
416 }
417
418 fn log_pr_extraction(
420 &mut self,
421 text_result: Option<&str>,
422 gh_result: Option<&str>,
423 final_url: Option<&str>,
424 ) {
425 let _ = writeln!(self.file, "[PR extraction]");
426 let _ = writeln!(
427 self.file,
428 " extract_pr_url(all_text) → {}",
429 text_result.unwrap_or("None"),
430 );
431 let _ = writeln!(
432 self.file,
433 " check_pr_exists_via_gh → {}",
434 gh_result.unwrap_or("not attempted"),
435 );
436 let _ = writeln!(
437 self.file,
438 " Result: {}",
439 final_url
440 .map(|u| format!("OK → {u}"))
441 .unwrap_or_else(|| "FAILED — no PR URL found".to_string()),
442 );
443 let _ = writeln!(self.file);
444 }
445
446 fn log_message(&mut self, msg: &str) {
448 let _ = writeln!(self.file, "{msg}");
449 }
450}
451
452const LOG_TEXT_LIMIT: usize = 50_000;
454
455fn truncate_for_log(text: &str, limit: usize) -> &str {
457 if text.len() <= limit {
458 text
459 } else {
460 let mut end = limit;
461 while end > 0 && !text.is_char_boundary(end) {
462 end -= 1;
463 }
464 &text[..end]
465 }
466}
467
468pub struct Runner {
473 client: ClaudeClient,
474 pm: PromptManager,
475 config: CodaConfig,
476 state: FeatureState,
477 state_path: PathBuf,
478 worktree_path: PathBuf,
479 connected: bool,
480 review_summary: ReviewSummary,
481 verification_summary: VerificationSummary,
482 progress_tx: Option<UnboundedSender<RunEvent>>,
483 metrics: MetricsTracker,
485 run_logger: Option<RunLogger>,
487 git: Arc<dyn GitOps>,
489 gh: Arc<dyn GhOps>,
491}
492
493impl Runner {
494 pub fn new(
504 feature_slug: &str,
505 project_root: PathBuf,
506 pm: &PromptManager,
507 config: &CodaConfig,
508 git: Arc<dyn GitOps>,
509 gh: Arc<dyn GhOps>,
510 ) -> Result<Self, CoreError> {
511 let feature_dir = find_feature_dir(&project_root, feature_slug)?;
513 let state_path = feature_dir.join("state.yml");
514
515 let state_content = std::fs::read_to_string(&state_path)
517 .map_err(|e| CoreError::StateError(format!("Cannot read state.yml: {e}")))?;
518 let state: FeatureState = serde_yaml::from_str(&state_content)?;
519
520 state.validate().map_err(|e| {
521 CoreError::StateError(format!(
522 "Invalid state.yml at {}: {e}",
523 state_path.display()
524 ))
525 })?;
526
527 let worktree_path = project_root.join(&state.git.worktree_path);
528
529 let coda_md = std::fs::read_to_string(project_root.join(".coda.md")).unwrap_or_default();
531 let system_prompt = pm.render("run/system", minijinja::context!(coda_md => coda_md))?;
532
533 let options = AgentProfile::Coder.to_options(
535 &system_prompt,
536 worktree_path.clone(),
537 config.agent.max_turns,
538 config.agent.max_budget_usd,
539 &config.agent.model,
540 );
541
542 let client = ClaudeClient::new(options);
543 let run_logger = RunLogger::new(&feature_dir);
544
545 Ok(Self {
546 client,
547 pm: pm.clone(),
548 config: config.clone(),
549 state,
550 state_path,
551 worktree_path,
552 connected: false,
553 review_summary: ReviewSummary::default(),
554 verification_summary: VerificationSummary::default(),
555 progress_tx: None,
556 metrics: MetricsTracker::default(),
557 run_logger,
558 git,
559 gh,
560 })
561 }
562
563 pub fn set_progress_sender(&mut self, tx: UnboundedSender<RunEvent>) {
568 self.progress_tx = Some(tx);
569 }
570
571 pub async fn execute(&mut self) -> Result<Vec<TaskResult>, CoreError> {
583 self.client
585 .connect()
586 .await
587 .map_err(|e| CoreError::AgentError(e.to_string()))?;
588 self.connected = true;
589
590 self.state.status = FeatureStatus::InProgress;
592 self.save_state()?;
593
594 let mut results = Vec::new();
595 let total_phases = self.state.phases.len();
596
597 let start_phase = self
601 .state
602 .phases
603 .iter()
604 .position(|p| p.status != PhaseStatus::Completed)
605 .unwrap_or(total_phases);
606
607 if start_phase < total_phases {
609 self.state.current_phase = start_phase as u32;
610 }
611
612 if start_phase > 0 {
613 info!(
614 start_phase = start_phase,
615 total = total_phases,
616 "Resuming from phase {} (skipping {} completed)",
617 self.state
618 .phases
619 .get(start_phase)
620 .map_or("create-pr", |p| p.name.as_str()),
621 start_phase,
622 );
623 }
624
625 let phase_names: Vec<String> = self.state.phases.iter().map(|p| p.name.clone()).collect();
627 self.emit_event(RunEvent::RunStarting {
628 phases: phase_names.clone(),
629 });
630
631 if let Some(logger) = &mut self.run_logger {
632 logger.log_header(
633 &self.state.feature.slug,
634 &self.config.agent.model,
635 &phase_names,
636 );
637 }
638
639 for phase_idx in start_phase..total_phases {
640 let phase_name = self.state.phases[phase_idx].name.clone();
641 let phase_kind = self.state.phases[phase_idx].kind.clone();
642
643 info!(phase = %phase_name, index = phase_idx, "Starting phase");
644
645 let kind_str = match &phase_kind {
646 PhaseKind::Dev => "dev",
647 PhaseKind::Quality => "quality",
648 };
649 if let Some(logger) = &mut self.run_logger {
650 logger.log_phase_start(&phase_name, phase_idx, total_phases, kind_str);
651 }
652
653 self.emit_event(RunEvent::PhaseStarting {
654 name: phase_name.clone(),
655 index: phase_idx,
656 total: total_phases,
657 });
658
659 let result = match phase_kind {
660 PhaseKind::Dev => self.run_dev_phase(phase_idx).await,
661 PhaseKind::Quality => match phase_name.as_str() {
662 "review" => self.run_review(phase_idx).await,
663 "verify" => self.run_verify(phase_idx).await,
664 _ => Err(CoreError::AgentError(format!(
665 "Unknown quality phase: {phase_name}"
666 ))),
667 },
668 };
669
670 match result {
671 Ok(task_result) => {
672 info!(
673 phase = %phase_name,
674 turns = task_result.turns,
675 cost_usd = task_result.cost_usd,
676 "Phase completed"
677 );
678 self.emit_event(RunEvent::PhaseCompleted {
679 name: phase_name.clone(),
680 index: phase_idx,
681 duration: task_result.duration,
682 turns: task_result.turns,
683 cost_usd: task_result.cost_usd,
684 });
685 results.push(task_result);
686
687 self.state.current_phase = ((phase_idx + 1).min(total_phases)) as u32;
689 self.save_state()?;
690 }
691 Err(e) => {
692 error!(phase = %phase_name, error = %e, "Phase failed");
693 if let Some(logger) = &mut self.run_logger {
694 logger.log_message(&format!(
695 "✗ Phase {phase_name} FAILED: {e}\n Aborting run.\n",
696 ));
697 }
698 self.emit_event(RunEvent::PhaseFailed {
699 name: phase_name.clone(),
700 index: phase_idx,
701 error: e.to_string(),
702 });
703 self.state.phases[phase_idx].status = PhaseStatus::Failed;
704 self.state.status = FeatureStatus::Failed;
705 self.save_state()?;
706 return Err(e);
707 }
708 }
709 }
710
711 self.update_totals();
714 self.save_state()?;
715 self.commit_coda_state()?;
716
717 info!("All phases complete, creating PR...");
719 self.emit_event(RunEvent::CreatingPr);
720 let pr_result = self.create_pr().await?;
721
722 let pr_url = self.state.pr.as_ref().map(|pr| pr.url.clone());
724 self.emit_event(RunEvent::PrCreated { url: pr_url });
725
726 let pr_succeeded = matches!(pr_result.status, TaskStatus::Completed);
727 results.push(pr_result);
728
729 if pr_succeeded {
731 self.state.status = FeatureStatus::Completed;
732 } else {
733 warn!("Feature development complete but PR creation failed");
736 }
737 self.update_totals();
738 self.save_state()?;
739
740 self.commit_coda_state()?;
743 let branch = &self.state.git.branch.clone();
744 self.git.push(&self.worktree_path, branch)?;
745
746 self.emit_event(RunEvent::RunFinished {
747 success: pr_succeeded,
748 });
749
750 if self.connected {
752 let _ = self.client.disconnect().await;
753 self.connected = false;
754 }
755
756 Ok(results)
757 }
758
759 async fn run_dev_phase(&mut self, phase_idx: usize) -> Result<TaskResult, CoreError> {
764 let was_running = self.state.phases[phase_idx].status == PhaseStatus::Running;
765 self.mark_phase_running(phase_idx);
766
767 let mut acc = PhaseMetricsAccumulator::new();
768
769 let design_spec = self.load_spec("design.md")?;
770 let checks = &self.config.checks;
771 let feature_slug = self.state.feature.slug.clone();
772 let phase_name = self.state.phases[phase_idx].name.clone();
773
774 let dev_phase_number = self
776 .state
777 .phases
778 .iter()
779 .take(phase_idx + 1)
780 .filter(|p| p.kind == PhaseKind::Dev)
781 .count();
782 let total_dev_phases = self
783 .state
784 .phases
785 .iter()
786 .filter(|p| p.kind == PhaseKind::Dev)
787 .count();
788 let is_first = phase_idx == 0;
789
790 let resume_context = if was_running {
792 self.build_resume_context()?
793 } else {
794 String::new()
795 };
796
797 let prompt = self.pm.render(
798 "run/dev_phase",
799 minijinja::context!(
800 design_spec => design_spec,
801 phase_name => phase_name,
802 phase_number => dev_phase_number,
803 total_dev_phases => total_dev_phases,
804 is_first => is_first,
805 checks => checks,
806 feature_slug => feature_slug,
807 resume_context => resume_context,
808 ),
809 )?;
810
811 let resp = self.send_and_collect(&prompt).await?;
812 let incremental = self.metrics.record(&resp.result);
813 if let Some(logger) = &mut self.run_logger {
814 logger.log_interaction(&prompt, &resp, &incremental);
815 }
816 acc.record(&resp, incremental);
817
818 let outcome = acc.into_outcome(serde_json::json!({}));
819 let task_result = TaskResult {
820 task: Task::DevPhase {
821 name: phase_name,
822 feature_slug: self.state.feature.slug.clone(),
823 },
824 status: TaskStatus::Completed,
825 turns: outcome.turns,
826 cost_usd: outcome.cost_usd,
827 duration: outcome.duration,
828 artifacts: vec![],
829 };
830 self.complete_phase(phase_idx, outcome);
831
832 Ok(task_result)
833 }
834
835 async fn run_review(&mut self, phase_idx: usize) -> Result<TaskResult, CoreError> {
841 self.mark_phase_running(phase_idx);
842
843 if !self.config.review.enabled {
844 info!("Code review disabled, skipping");
845 let outcome = PhaseOutcome {
846 turns: 0,
847 cost_usd: 0.0,
848 input_tokens: 0,
849 output_tokens: 0,
850 duration: Duration::ZERO,
851 details: serde_json::json!({}),
852 };
853 let task_result = TaskResult {
854 task: Task::Review {
855 feature_slug: self.state.feature.slug.clone(),
856 },
857 status: TaskStatus::Completed,
858 turns: 0,
859 cost_usd: 0.0,
860 duration: Duration::ZERO,
861 artifacts: vec![],
862 };
863 self.complete_phase(phase_idx, outcome);
864 return Ok(task_result);
865 }
866
867 let design_spec = self.load_spec("design.md")?;
868 let max_rounds = self.config.review.max_review_rounds;
869 let mut acc = PhaseMetricsAccumulator::new();
870
871 for round in 0..max_rounds {
872 info!(round = round + 1, max = max_rounds, "Review round");
873
874 let diff = self.get_diff()?;
875 let review_prompt = self.pm.render(
876 "run/review",
877 minijinja::context!(
878 design_spec => design_spec,
879 diff => diff,
880 ),
881 )?;
882
883 let resp = self.send_and_collect(&review_prompt).await?;
884 let m = self.metrics.record(&resp.result);
885 if let Some(logger) = &mut self.run_logger {
886 logger.log_interaction(&review_prompt, &resp, &m);
887 }
888 acc.record(&resp, m);
889
890 self.review_summary.rounds += 1;
891
892 let issues = parse_review_issues(&resp.text);
894 let issue_count = issues.len() as u32;
895 self.review_summary.issues_found += issue_count;
896
897 self.emit_event(RunEvent::ReviewRound {
898 round: round + 1,
899 max_rounds,
900 issues_found: issue_count,
901 });
902
903 if issues.is_empty() {
904 info!("No critical/major issues found, review passed");
905 break;
906 }
907
908 info!(issues = issue_count, "Found issues, asking agent to fix");
909
910 let issues_list = issues
912 .iter()
913 .enumerate()
914 .map(|(i, issue)| format!("{}. {}", i + 1, issue))
915 .collect::<Vec<_>>()
916 .join("\n");
917 let fix_prompt = format!(
918 "The code review found {issue_count} critical/major issues that must be fixed.\n\n\
919 ## Issues\n\n{issues_list}\n\n\
920 ## Instructions\n\n\
921 1. Fix each issue listed above\n\
922 2. Run the configured checks to ensure nothing is broken\n\
923 3. Commit the fixes with a descriptive message\n\n\
924 Refer to the design specification provided earlier for the intended behavior.",
925 );
926
927 let fix_resp = self.send_and_collect(&fix_prompt).await?;
928 let fm = self.metrics.record(&fix_resp.result);
929 if let Some(logger) = &mut self.run_logger {
930 logger.log_interaction(&fix_prompt, &fix_resp, &fm);
931 }
932 acc.record(&fix_resp, fm);
933
934 self.review_summary.issues_resolved += issue_count;
935 }
936
937 let outcome = acc.into_outcome(serde_json::json!({
938 "rounds": self.review_summary.rounds,
939 "issues_found": self.review_summary.issues_found,
940 "issues_resolved": self.review_summary.issues_resolved,
941 }));
942 let task_result = TaskResult {
943 task: Task::Review {
944 feature_slug: self.state.feature.slug.clone(),
945 },
946 status: TaskStatus::Completed,
947 turns: outcome.turns,
948 cost_usd: outcome.cost_usd,
949 duration: outcome.duration,
950 artifacts: vec![],
951 };
952 self.complete_phase(phase_idx, outcome);
953
954 Ok(task_result)
955 }
956
957 async fn run_verify(&mut self, phase_idx: usize) -> Result<TaskResult, CoreError> {
962 self.mark_phase_running(phase_idx);
963
964 let verification_spec = self.load_spec("verification.md")?;
965 let checks = self.config.checks.clone();
966 let max_attempts = self.config.agent.max_retries;
967 let mut acc = PhaseMetricsAccumulator::new();
968
969 for attempt in 0..=max_attempts {
970 info!(
971 attempt = attempt + 1,
972 max = max_attempts + 1,
973 "Verification attempt"
974 );
975
976 let verify_prompt = self.pm.render(
977 "run/verify",
978 minijinja::context!(
979 verification_spec => verification_spec,
980 checks => &checks,
981 ),
982 )?;
983
984 let resp = self.send_and_collect(&verify_prompt).await?;
985 let m = self.metrics.record(&resp.result);
986 if let Some(logger) = &mut self.run_logger {
987 logger.log_interaction(&verify_prompt, &resp, &m);
988 }
989 acc.record(&resp, m);
990
991 let (passed, failed_details) = parse_verification_result(&resp.text);
993 self.verification_summary.checks_total = passed + failed_details.len() as u32;
994 self.verification_summary.checks_passed = passed;
995
996 let all_passed = failed_details.is_empty();
997 self.emit_event(RunEvent::VerifyAttempt {
998 attempt: attempt + 1,
999 max_attempts: max_attempts + 1,
1000 passed: all_passed,
1001 });
1002
1003 if all_passed {
1004 info!("All verification checks passed");
1005 break;
1006 }
1007
1008 if attempt == max_attempts {
1009 warn!("Max verification attempts reached, proceeding with failures");
1010 break;
1011 }
1012
1013 info!(
1014 failures = failed_details.len(),
1015 "Verification failed, asking agent to fix"
1016 );
1017
1018 let failures = failed_details.join("\n");
1019 let checks_str = checks.join("`, `");
1020 let fix_prompt = format!(
1021 "Verification failed. The following checks did not pass:\n\n\
1022 ## Failed Checks\n\n{failures}\n\n\
1023 ## Instructions\n\n\
1024 1. Analyze each failure and identify the root cause\n\
1025 2. Fix the code to address each failure\n\
1026 3. Re-run all checks: `{checks_str}`\n\
1027 4. Ensure ALL checks pass before reporting back\n\n\
1028 Refer to the design specification and verification plan provided earlier.",
1029 );
1030
1031 let fix_resp = self.send_and_collect(&fix_prompt).await?;
1032 let fm = self.metrics.record(&fix_resp.result);
1033 if let Some(logger) = &mut self.run_logger {
1034 logger.log_interaction(&fix_prompt, &fix_resp, &fm);
1035 }
1036 acc.record(&fix_resp, fm);
1037 }
1038
1039 let outcome = acc.into_outcome(serde_json::json!({
1040 "attempts": self.verification_summary.checks_total,
1041 "checks_passed": self.verification_summary.checks_passed,
1042 "checks_total": self.verification_summary.checks_total,
1043 }));
1044 let task_result = TaskResult {
1045 task: Task::Verify {
1046 feature_slug: self.state.feature.slug.clone(),
1047 },
1048 status: TaskStatus::Completed,
1049 turns: outcome.turns,
1050 cost_usd: outcome.cost_usd,
1051 duration: outcome.duration,
1052 artifacts: vec![],
1053 };
1054 self.complete_phase(phase_idx, outcome);
1055
1056 Ok(task_result)
1057 }
1058
1059 async fn create_pr(&mut self) -> Result<TaskResult, CoreError> {
1068 let design_spec = self.load_spec("design.md")?;
1069 let commits = self.get_commits()?;
1070 let checks = &self.config.checks;
1071 let start = Instant::now();
1072
1073 let all_checks_passed = self.verification_summary.checks_passed
1074 == self.verification_summary.checks_total
1075 && self.verification_summary.checks_total > 0;
1076 let is_draft = !all_checks_passed;
1077 let model = &self.config.agent.model;
1078 let coda_version = env!("CARGO_PKG_VERSION");
1079
1080 let pr_prompt = self.pm.render(
1081 "run/create_pr",
1082 minijinja::context!(
1083 design_spec => design_spec,
1084 commits => commits,
1085 state => &self.state,
1086 checks => checks,
1087 review_summary => &self.review_summary,
1088 verification_summary => &self.verification_summary,
1089 all_checks_passed => all_checks_passed,
1090 is_draft => is_draft,
1091 model => model,
1092 coda_version => coda_version,
1093 ),
1094 )?;
1095
1096 let resp = self.send_and_collect(&pr_prompt).await?;
1097 let pr_metrics = self.metrics.record(&resp.result);
1098 if let Some(logger) = &mut self.run_logger {
1099 logger.log_interaction(&pr_prompt, &resp, &pr_metrics);
1100 }
1101
1102 let all_text = resp.all_text();
1104 let url_from_text = extract_pr_url(&all_text);
1105
1106 let url_from_gh = if url_from_text.is_none() {
1107 info!("PR URL not found in agent response, checking via gh CLI...");
1108 self.check_pr_exists_via_gh()
1109 } else {
1110 None
1111 };
1112
1113 let pr_url = url_from_text.clone().or(url_from_gh.clone());
1114
1115 if let Some(logger) = &mut self.run_logger {
1116 logger.log_pr_extraction(
1117 url_from_text.as_deref(),
1118 url_from_gh.as_deref(),
1119 pr_url.as_deref(),
1120 );
1121 }
1122
1123 let status = if let Some(ref url) = pr_url {
1124 info!(url = %url, "PR created");
1125 self.state.pr = Some(crate::state::PrInfo {
1126 url: url.clone(),
1127 number: extract_pr_number(url).unwrap_or(0),
1128 title: format!("feat({}): feature implementation", self.state.feature.slug),
1129 });
1130 self.save_state()?;
1131 TaskStatus::Completed
1132 } else {
1133 let msg = "PR creation failed: no PR URL found in agent response or via gh CLI";
1134 warn!(msg);
1135 TaskStatus::Failed {
1136 error: msg.to_string(),
1137 }
1138 };
1139
1140 Ok(TaskResult {
1141 task: Task::CreatePr {
1142 feature_slug: self.state.feature.slug.clone(),
1143 },
1144 status,
1145 turns: resp.result.as_ref().map_or(1, |r| r.num_turns),
1146 cost_usd: pr_metrics.cost_usd,
1147 duration: start.elapsed(),
1148 artifacts: vec![],
1149 })
1150 }
1151
1152 fn emit_event(&self, event: RunEvent) {
1158 if let Some(tx) = &self.progress_tx {
1159 let _ = tx.send(event);
1160 }
1161 }
1162
1163 fn commit_coda_state(&self) -> Result<(), CoreError> {
1171 self.git.add(&self.worktree_path, &[".coda/"])?;
1172
1173 if self.git.has_staged_changes(&self.worktree_path) {
1174 let msg = format!("chore({}): update execution state", self.state.feature.slug);
1175 self.git.commit(&self.worktree_path, &msg)?;
1176 info!("Committed .coda/ state updates");
1177 } else {
1178 debug!("No .coda/ changes to commit");
1179 }
1180
1181 Ok(())
1182 }
1183
1184 async fn send_and_collect(&mut self, prompt: &str) -> Result<AgentResponse, CoreError> {
1195 self.client
1196 .query(prompt)
1197 .await
1198 .map_err(|e| CoreError::AgentError(e.to_string()))?;
1199
1200 let mut resp = AgentResponse::default();
1201
1202 {
1203 let mut stream = self.client.receive_response();
1204 while let Some(result) = stream.next().await {
1205 let msg = result.map_err(|e| CoreError::AgentError(e.to_string()))?;
1206 match msg {
1207 Message::Assistant(assistant) => {
1208 for block in &assistant.message.content {
1209 match block {
1210 ContentBlock::Text(text) => {
1211 resp.text.push_str(&text.text);
1212 }
1213 ContentBlock::ToolResult(tr) => {
1214 collect_tool_result_text(
1215 tr.content.as_ref(),
1216 &mut resp.tool_output,
1217 );
1218 }
1219 _ => {}
1220 }
1221 }
1222 }
1223 Message::User(user) => {
1224 if let Some(blocks) = &user.content {
1225 for block in blocks {
1226 if let ContentBlock::ToolResult(tr) = block {
1227 collect_tool_result_text(
1228 tr.content.as_ref(),
1229 &mut resp.tool_output,
1230 );
1231 }
1232 }
1233 }
1234 }
1235 Message::Result(r) => {
1236 resp.result = Some(r);
1237 break;
1238 }
1239 _ => {}
1240 }
1241 }
1242 }
1243
1244 if resp.text.is_empty() && resp.tool_output.is_empty() {
1245 let reason = resp
1246 .result
1247 .as_ref()
1248 .map(|r| {
1249 format!(
1250 "turns={}, cost={:?}, is_error={}",
1251 r.num_turns, r.total_cost_usd, r.is_error,
1252 )
1253 })
1254 .unwrap_or_else(|| "no ResultMessage received".to_string());
1255
1256 error!(reason = %reason, "Agent returned empty response");
1257 if let Some(logger) = &mut self.run_logger {
1258 logger.log_message(&format!(
1259 "⚠ EMPTY RESPONSE detected\n prompt_len={}\n reason: {reason}",
1260 prompt.len(),
1261 ));
1262 }
1263
1264 return Err(CoreError::AgentError(format!(
1265 "Agent returned empty response (session may be disconnected): {reason}",
1266 )));
1267 }
1268
1269 Ok(resp)
1270 }
1271
1272 fn mark_phase_running(&mut self, phase_idx: usize) {
1274 self.state.phases[phase_idx].status = PhaseStatus::Running;
1275 self.state.phases[phase_idx].started_at = Some(chrono::Utc::now());
1276 if let Err(e) = self.save_state() {
1277 warn!(error = %e, "Failed to save state when marking phase as running");
1278 }
1279 }
1280
1281 fn complete_phase(&mut self, phase_idx: usize, outcome: PhaseOutcome) {
1286 let phase = &mut self.state.phases[phase_idx];
1287 phase.status = PhaseStatus::Completed;
1288 phase.completed_at = Some(chrono::Utc::now());
1289 phase.turns = outcome.turns;
1290 phase.cost_usd = outcome.cost_usd;
1291 phase.cost.input_tokens = outcome.input_tokens;
1292 phase.cost.output_tokens = outcome.output_tokens;
1293 phase.duration_secs = outcome.duration.as_secs();
1294 phase.details = outcome.details;
1295 self.state.feature.updated_at = chrono::Utc::now();
1296
1297 if let Err(e) = self.save_state() {
1298 warn!(error = %e, "Failed to save state after completing phase");
1299 }
1300 }
1301
1302 fn save_state(&self) -> Result<(), CoreError> {
1304 let yaml = serde_yaml::to_string(&self.state)?;
1305 std::fs::write(&self.state_path, yaml).map_err(CoreError::IoError)?;
1306 debug!(path = %self.state_path.display(), "State saved");
1307 Ok(())
1308 }
1309
1310 fn load_spec(&self, filename: &str) -> Result<String, CoreError> {
1316 let spec_path = self
1317 .worktree_path
1318 .join(".coda")
1319 .join(&self.state.feature.slug)
1320 .join("specs")
1321 .join(filename);
1322
1323 std::fs::read_to_string(&spec_path).map_err(|e| {
1324 CoreError::StateError(format!("Cannot read spec at {}: {e}", spec_path.display()))
1325 })
1326 }
1327
1328 fn get_diff(&self) -> Result<String, CoreError> {
1330 self.git
1331 .diff(&self.worktree_path, &self.state.git.base_branch)
1332 }
1333
1334 fn get_commits(&self) -> Result<Vec<CommitInfo>, CoreError> {
1336 let range = format!("{}..HEAD", self.state.git.base_branch);
1337 let stdout = self.git.log_oneline(&self.worktree_path, &range)?;
1338
1339 let commits = stdout
1340 .lines()
1341 .filter(|l| !l.is_empty())
1342 .filter_map(|line| {
1343 let mut parts = line.splitn(2, ' ');
1344 let sha = parts.next()?.to_string();
1345 let message = parts.next().unwrap_or("").to_string();
1346 Some(CommitInfo { sha, message })
1347 })
1348 .collect();
1349
1350 Ok(commits)
1351 }
1352
1353 fn build_resume_context(&self) -> Result<String, CoreError> {
1355 let completed_phases: Vec<serde_json::Value> = self
1356 .state
1357 .phases
1358 .iter()
1359 .filter(|p| p.status == PhaseStatus::Completed)
1360 .map(|p| {
1361 let summary = format!(
1362 "{} turns, {}s, {} input / {} output tokens",
1363 p.turns, p.duration_secs, p.cost.input_tokens, p.cost.output_tokens
1364 );
1365 serde_json::json!({
1366 "name": p.name,
1367 "duration_secs": p.duration_secs,
1368 "turns": p.turns,
1369 "cost": {
1370 "input_tokens": p.cost.input_tokens,
1371 "output_tokens": p.cost.output_tokens,
1372 },
1373 "summary": summary,
1374 })
1375 })
1376 .collect();
1377
1378 let current_phase_name = &self.state.phases[self.state.current_phase as usize].name;
1379 let current_phase_state = &self.state.phases[self.state.current_phase as usize];
1380
1381 self.pm
1382 .render(
1383 "run/resume",
1384 minijinja::context!(
1385 state => &self.state,
1386 completed_phases => completed_phases,
1387 current_phase => current_phase_name,
1388 current_phase_state => current_phase_state,
1389 ),
1390 )
1391 .map_err(CoreError::from)
1392 }
1393
1394 fn update_totals(&mut self) {
1396 let mut total_turns = 0u32;
1397 let mut total_cost = 0.0f64;
1398 let mut total_duration = 0u64;
1399 let mut total_input_tokens = 0u64;
1400 let mut total_output_tokens = 0u64;
1401
1402 for phase in &self.state.phases {
1403 total_turns += phase.turns;
1404 total_cost += phase.cost_usd;
1405 total_duration += phase.duration_secs;
1406 total_input_tokens += phase.cost.input_tokens;
1407 total_output_tokens += phase.cost.output_tokens;
1408 }
1409
1410 self.state.total.turns = total_turns;
1411 self.state.total.cost_usd = total_cost;
1412 self.state.total.duration_secs = total_duration;
1413 self.state.total.cost.input_tokens = total_input_tokens;
1414 self.state.total.cost.output_tokens = total_output_tokens;
1415 }
1416
1417 fn check_pr_exists_via_gh(&self) -> Option<String> {
1422 let branch = &self.state.git.branch;
1423 self.gh.pr_url_for_branch(branch, &self.worktree_path)
1424 }
1425}
1426
1427impl std::fmt::Debug for Runner {
1428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1429 f.debug_struct("Runner")
1430 .field("feature", &self.state.feature.slug)
1431 .field("current_phase", &self.state.current_phase)
1432 .field("worktree", &self.worktree_path)
1433 .finish_non_exhaustive()
1434 }
1435}
1436
1437fn collect_tool_result_text(content: Option<&ToolResultContent>, buf: &mut String) {
1441 match content {
1442 Some(ToolResultContent::Text(text)) => {
1443 if !buf.is_empty() {
1444 buf.push('\n');
1445 }
1446 buf.push_str(text);
1447 }
1448 Some(ToolResultContent::Blocks(blocks)) => {
1449 for block in blocks {
1450 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
1451 if !buf.is_empty() {
1452 buf.push('\n');
1453 }
1454 buf.push_str(text);
1455 }
1456 }
1457 }
1458 None => {}
1459 }
1460}
1461
1462fn find_feature_dir(project_root: &Path, feature_slug: &str) -> Result<PathBuf, CoreError> {
1477 let trees_dir = project_root.join(".trees");
1478 if !trees_dir.is_dir() {
1479 return Err(CoreError::ConfigError(format!(
1480 "No .trees/ directory found at {}. Run `coda init` first.",
1481 trees_dir.display()
1482 )));
1483 }
1484
1485 let worktree_path = trees_dir.join(feature_slug);
1487 let feature_dir = worktree_path.join(".coda").join(feature_slug);
1488
1489 if feature_dir.is_dir() && feature_dir.join("state.yml").is_file() {
1490 return Ok(feature_dir);
1491 }
1492
1493 let entries = std::fs::read_dir(&trees_dir).map_err(CoreError::IoError)?;
1495 let mut available_features = Vec::new();
1496
1497 for entry in entries.flatten() {
1498 if !entry.file_type().is_ok_and(|ft| ft.is_dir()) {
1499 continue;
1500 }
1501 let name = entry.file_name();
1502 let name_str = name.to_string_lossy();
1503
1504 let candidate = entry.path().join(".coda").join(feature_slug);
1505 if candidate.is_dir() && candidate.join("state.yml").is_file() {
1506 return Ok(candidate);
1507 }
1508
1509 let coda_dir = entry.path().join(".coda");
1511 if coda_dir.is_dir()
1512 && let Ok(coda_entries) = std::fs::read_dir(&coda_dir)
1513 {
1514 for ce in coda_entries.flatten() {
1515 if ce.file_type().is_ok_and(|ft| ft.is_dir())
1516 && ce.path().join("state.yml").is_file()
1517 {
1518 available_features.push(ce.file_name().to_string_lossy().to_string());
1519 }
1520 }
1521 }
1522
1523 if available_features.is_empty() {
1525 available_features.push(name_str.to_string());
1526 }
1527 }
1528
1529 let hint = if available_features.is_empty() {
1530 "No features have been planned yet.".to_string()
1531 } else {
1532 format!("Available features: {}", available_features.join(", "))
1533 };
1534
1535 Err(CoreError::StateError(format!(
1536 "No feature directory found for slug '{feature_slug}'. {hint}\nRun `coda plan {feature_slug}` first.",
1537 )))
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542 use super::*;
1543
1544 #[test]
1545 fn test_should_compute_incremental_metrics_from_result() {
1546 let mut tracker = MetricsTracker::default();
1547
1548 let result1 = ResultMessage {
1550 subtype: "success".to_string(),
1551 duration_ms: 1000,
1552 duration_api_ms: 800,
1553 is_error: false,
1554 num_turns: 3,
1555 session_id: "test".to_string(),
1556 total_cost_usd: Some(0.50),
1557 usage: Some(serde_json::json!({
1558 "input_tokens": 1000,
1559 "output_tokens": 500,
1560 })),
1561 result: None,
1562 structured_output: None,
1563 };
1564
1565 let m1 = tracker.record(&Some(result1));
1566 assert!((m1.cost_usd - 0.50).abs() < f64::EPSILON);
1567 assert_eq!(m1.input_tokens, 1000);
1568 assert_eq!(m1.output_tokens, 500);
1569
1570 let result2 = ResultMessage {
1572 subtype: "success".to_string(),
1573 duration_ms: 2000,
1574 duration_api_ms: 1600,
1575 is_error: false,
1576 num_turns: 2,
1577 session_id: "test".to_string(),
1578 total_cost_usd: Some(0.80),
1579 usage: Some(serde_json::json!({
1580 "input_tokens": 2500,
1581 "output_tokens": 1200,
1582 })),
1583 result: None,
1584 structured_output: None,
1585 };
1586
1587 let m2 = tracker.record(&Some(result2));
1588 assert!((m2.cost_usd - 0.30).abs() < f64::EPSILON);
1589 assert_eq!(m2.input_tokens, 1500);
1590 assert_eq!(m2.output_tokens, 700);
1591 }
1592
1593 #[test]
1594 fn test_should_handle_none_result_gracefully() {
1595 let mut tracker = MetricsTracker::default();
1596 let m = tracker.record(&None);
1597 assert!((m.cost_usd - 0.0).abs() < f64::EPSILON);
1598 assert_eq!(m.input_tokens, 0);
1599 assert_eq!(m.output_tokens, 0);
1600 }
1601
1602 #[test]
1603 fn test_should_accumulate_metrics_across_rounds() {
1604 let mut acc = PhaseMetricsAccumulator::new();
1605
1606 let resp1 = AgentResponse {
1607 text: "Review response".to_string(),
1608 tool_output: String::new(),
1609 result: Some(ResultMessage {
1610 subtype: "success".to_string(),
1611 duration_ms: 1000,
1612 duration_api_ms: 800,
1613 is_error: false,
1614 num_turns: 3,
1615 session_id: "test".to_string(),
1616 total_cost_usd: None,
1617 usage: None,
1618 result: None,
1619 structured_output: None,
1620 }),
1621 };
1622
1623 let m1 = IncrementalMetrics {
1624 cost_usd: 0.10,
1625 input_tokens: 500,
1626 output_tokens: 200,
1627 };
1628 acc.record(&resp1, m1);
1629
1630 let resp2 = AgentResponse {
1631 text: "Fix response".to_string(),
1632 tool_output: String::new(),
1633 result: Some(ResultMessage {
1634 subtype: "success".to_string(),
1635 duration_ms: 2000,
1636 duration_api_ms: 1600,
1637 is_error: false,
1638 num_turns: 5,
1639 session_id: "test".to_string(),
1640 total_cost_usd: None,
1641 usage: None,
1642 result: None,
1643 structured_output: None,
1644 }),
1645 };
1646
1647 let m2 = IncrementalMetrics {
1648 cost_usd: 0.15,
1649 input_tokens: 800,
1650 output_tokens: 300,
1651 };
1652 acc.record(&resp2, m2);
1653
1654 assert_eq!(acc.turns, 8); assert!((acc.cost_usd - 0.25).abs() < f64::EPSILON);
1656 assert_eq!(acc.input_tokens, 1300);
1657 assert_eq!(acc.output_tokens, 500);
1658
1659 let outcome = acc.into_outcome(serde_json::json!({"test": true}));
1660 assert_eq!(outcome.turns, 8);
1661 assert!((outcome.cost_usd - 0.25).abs() < f64::EPSILON);
1662 }
1663
1664 #[test]
1665 fn test_should_collect_agent_response_all_text() {
1666 let resp = AgentResponse {
1667 text: "assistant text".to_string(),
1668 tool_output: "tool output".to_string(),
1669 result: None,
1670 };
1671 let all = resp.all_text();
1672 assert!(all.contains("assistant text"));
1673 assert!(all.contains("tool output"));
1674
1675 let resp_no_tool = AgentResponse {
1676 text: "only text".to_string(),
1677 tool_output: String::new(),
1678 result: None,
1679 };
1680 assert_eq!(resp_no_tool.all_text(), "only text");
1681 }
1682}