1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4use std::sync::mpsc;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow};
8use include_dir::{Dir, include_dir};
9use serde::Serialize;
10use serde_json::Value;
11use tokio::fs;
12use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
13use tokio::process::Command;
14use tokio::task::JoinHandle;
15use tokio::time::timeout;
16use tracing::{debug, error, info, warn};
17
18use crate::domain::ai::{AiProvider, AiSessionMode};
19use crate::domain::config::{AppConfig, PromptTransport};
20use crate::domain::diff::{DiffDocument, DiffFile, DiffHunk};
21use crate::domain::reference::parse_file_references;
22use crate::domain::review::{Author, CommentStatus, LineComment, ReviewState};
23use crate::git::diff::{DiffSource, load_git_diff};
24use crate::services::review_service::{AddReplyInput, ReviewService};
25
26static AI_SESSION_PROMPTS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/prompts/ai_session");
27
28#[derive(Debug, Clone)]
29pub struct RunAiSessionInput {
30 pub review_name: String,
31 pub provider: AiProvider,
32 pub comment_ids: Vec<u64>,
33 pub mode: AiSessionMode,
34 pub diff_source: DiffSource,
35}
36
37#[derive(Debug, Clone, Serialize)]
38#[serde(rename_all = "snake_case")]
39pub struct AiSessionResult {
40 pub review_name: String,
41 pub provider: String,
42 pub mode: String,
43 pub client: String,
44 pub model: Option<String>,
45 pub session_id: String,
46 pub processed: usize,
47 pub skipped: usize,
48 pub failed: usize,
49 pub items: Vec<AiSessionItemResult>,
50}
51
52#[derive(Debug, Clone, Serialize)]
53#[serde(rename_all = "snake_case")]
54pub struct AiSessionItemResult {
55 pub comment_id: u64,
56 pub status: String,
57 pub message: String,
58}
59
60#[derive(Debug, Clone, Serialize)]
61#[serde(rename_all = "snake_case")]
62pub struct AiProgressEvent {
63 pub timestamp_ms: u64,
64 pub provider: String,
65 pub stream: String,
66 pub message: String,
67}
68
69#[derive(Debug, Clone)]
70struct ProviderInvocation {
71 reply: String,
72 model: Option<String>,
73}
74
75pub fn default_ai_session_mode(comment_ids: &[u64]) -> AiSessionMode {
76 if comment_ids.is_empty() {
77 AiSessionMode::Refactor
78 } else {
79 AiSessionMode::Reply
80 }
81}
82
83pub async fn run_ai_session(
84 service: &ReviewService,
85 input: RunAiSessionInput,
86) -> Result<AiSessionResult> {
87 run_ai_session_inner(service, input, None).await
88}
89
90pub async fn run_ai_session_with_progress(
91 service: &ReviewService,
92 input: RunAiSessionInput,
93 progress_sender: mpsc::Sender<AiProgressEvent>,
94) -> Result<AiSessionResult> {
95 run_ai_session_inner(service, input, Some(progress_sender)).await
96}
97
98async fn run_ai_session_inner(
99 service: &ReviewService,
100 input: RunAiSessionInput,
101 progress_sender: Option<mpsc::Sender<AiProgressEvent>>,
102) -> Result<AiSessionResult> {
103 info!(
104 review = %input.review_name,
105 provider = %input.provider.as_str(),
106 requested_comments = input.comment_ids.len(),
107 "starting ai session"
108 );
109 let config = service.load_config().await?;
110 let mut review = service.load_review(&input.review_name).await?;
111 let diff_document = match load_git_diff(&config, &input.diff_source).await {
112 Ok(document) => Some(document),
113 Err(error) => {
114 warn!(error = %error, "ai session prompt context: unable to load git diff");
115 None
116 }
117 };
118 let now_ms = now_ms()?;
119 let provider_cfg = config.ai.provider_config(input.provider);
120 let mut result = AiSessionResult {
121 review_name: input.review_name.clone(),
122 provider: input.provider.as_str().to_string(),
123 mode: input.mode.as_str().to_string(),
124 client: provider_cfg.client.clone(),
125 model: provider_cfg.model.clone(),
126 session_id: format!("{}-{}-{now_ms}", input.review_name, input.provider.as_str()),
127 processed: 0,
128 skipped: 0,
129 failed: 0,
130 items: Vec::new(),
131 };
132
133 if matches!(review.state, ReviewState::Done) {
134 warn!(
135 review = %input.review_name,
136 provider = %input.provider.as_str(),
137 "ai session skipped because review is done"
138 );
139 result.items.push(AiSessionItemResult {
140 comment_id: 0,
141 status: "skipped".to_string(),
142 message: "review is done; ai session ignored".to_string(),
143 });
144 result.skipped = 1;
145 return Ok(result);
146 }
147
148 let target_ids: Vec<u64> = if input.comment_ids.is_empty() {
149 review
150 .comments
151 .iter()
152 .filter(|comment| comment_is_targetable(comment.status.clone(), input.mode))
153 .map(|comment| comment.id)
154 .collect()
155 } else {
156 input.comment_ids.clone()
157 };
158 let total_targets = target_ids.len();
159 if total_targets == 0 {
160 result.items.push(AiSessionItemResult {
161 comment_id: 0,
162 status: "skipped".to_string(),
163 message: match input.mode {
164 AiSessionMode::Reply => "no replyable threads to process".to_string(),
165 AiSessionMode::Refactor => "no open threads to process".to_string(),
166 },
167 });
168 result.skipped = 1;
169 emit_progress(
170 progress_sender.as_ref(),
171 input.provider,
172 "system",
173 "no open threads to process",
174 );
175 return Ok(result);
176 }
177
178 let explicit_selection = !input.comment_ids.is_empty();
179 for (step_index, comment_id) in target_ids.into_iter().enumerate() {
180 emit_progress(
181 progress_sender.as_ref(),
182 input.provider,
183 "system",
184 format!(
185 "thread #{comment_id}: start ({}/{})",
186 step_index + 1,
187 total_targets
188 ),
189 );
190 debug!(
191 review = %input.review_name,
192 provider = %input.provider.as_str(),
193 comment_id,
194 "processing ai thread"
195 );
196 let maybe_comment = review
197 .comments
198 .iter()
199 .find(|comment| comment.id == comment_id);
200 let Some(comment) = maybe_comment else {
201 warn!(
202 review = %input.review_name,
203 provider = %input.provider.as_str(),
204 comment_id,
205 "ai session target comment not found"
206 );
207 result.failed += 1;
208 result.items.push(AiSessionItemResult {
209 comment_id,
210 status: "failed".to_string(),
211 message: "comment not found in review".to_string(),
212 });
213 emit_progress(
214 progress_sender.as_ref(),
215 input.provider,
216 "system",
217 format!("thread #{comment_id}: failed (comment not found)"),
218 );
219 continue;
220 };
221
222 let allow_selected_reply = explicit_selection && matches!(input.mode, AiSessionMode::Reply);
223 if !comment_is_targetable(comment.status.clone(), input.mode) && !allow_selected_reply {
224 debug!(
225 review = %input.review_name,
226 provider = %input.provider.as_str(),
227 comment_id,
228 status = ?comment.status,
229 "skipping non-targetable comment for selected mode"
230 );
231 result.skipped += 1;
232 result.items.push(AiSessionItemResult {
233 comment_id,
234 status: "skipped".to_string(),
235 message: format!(
236 "comment status {:?} is not targetable for {} mode",
237 comment.status,
238 input.mode.as_str()
239 ),
240 });
241 emit_progress(
242 progress_sender.as_ref(),
243 input.provider,
244 "system",
245 format!(
246 "thread #{comment_id}: skipped (status={:?})",
247 comment.status
248 ),
249 );
250 continue;
251 }
252
253 let prompt = build_thread_prompt(
254 &input.review_name,
255 comment_id,
256 &review,
257 diff_document.as_ref(),
258 input.mode,
259 );
260 let provider_reply = match invoke_provider(
261 &config,
262 input.provider,
263 input.mode,
264 &prompt,
265 progress_sender.clone(),
266 )
267 .await
268 {
269 Ok(reply) => reply,
270 Err(error) => {
271 error!(
272 review = %input.review_name,
273 provider = %input.provider.as_str(),
274 comment_id,
275 error = %error,
276 "provider invocation failed"
277 );
278 result.failed += 1;
279 result.items.push(AiSessionItemResult {
280 comment_id,
281 status: "failed".to_string(),
282 message: format!("provider failed: {error}"),
283 });
284 emit_progress(
285 progress_sender.as_ref(),
286 input.provider,
287 "system",
288 format!("thread #{comment_id}: failed ({error})"),
289 );
290 continue;
291 }
292 };
293 let reply_body =
294 format_ai_reply_body(provider_reply.model.as_deref(), &provider_reply.reply);
295
296 let updated = match service
297 .add_reply(
298 &input.review_name,
299 AddReplyInput {
300 comment_id,
301 author: Author::Ai,
302 body: reply_body,
303 },
304 )
305 .await
306 {
307 Ok(value) => value,
308 Err(error) => {
309 error!(
310 review = %input.review_name,
311 provider = %input.provider.as_str(),
312 comment_id,
313 error = %error,
314 "failed to persist ai reply"
315 );
316 result.failed += 1;
317 result.items.push(AiSessionItemResult {
318 comment_id,
319 status: "failed".to_string(),
320 message: format!("failed to persist ai reply: {error}"),
321 });
322 emit_progress(
323 progress_sender.as_ref(),
324 input.provider,
325 "system",
326 format!("thread #{comment_id}: failed (persist reply: {error})"),
327 );
328 continue;
329 }
330 };
331
332 review = updated;
333 result.processed += 1;
334 info!(
335 review = %input.review_name,
336 provider = %input.provider.as_str(),
337 comment_id,
338 "ai reply persisted"
339 );
340 result.items.push(AiSessionItemResult {
341 comment_id,
342 status: "processed".to_string(),
343 message: match input.mode {
344 AiSessionMode::Reply => "ai reply added".to_string(),
345 AiSessionMode::Refactor => {
346 "ai reply added; thread status moved to pending_human".to_string()
347 }
348 },
349 });
350 emit_progress(
351 progress_sender.as_ref(),
352 input.provider,
353 "system",
354 format!(
355 "thread #{comment_id}: done ({}/{})",
356 step_index + 1,
357 total_targets
358 ),
359 );
360 }
361
362 info!(
363 review = %input.review_name,
364 provider = %input.provider.as_str(),
365 processed = result.processed,
366 skipped = result.skipped,
367 failed = result.failed,
368 "ai session completed"
369 );
370 Ok(result)
371}
372
373fn build_thread_prompt(
374 review_name: &str,
375 comment_id: u64,
376 review: &crate::domain::review::ReviewSession,
377 diff_document: Option<&DiffDocument>,
378 mode: AiSessionMode,
379) -> String {
380 let Some(comment) = review
381 .comments
382 .iter()
383 .find(|comment| comment.id == comment_id)
384 else {
385 return missing_comment_prompt(review_name, comment_id);
386 };
387
388 let mut thread = String::new();
389 thread.push_str(&format!("Review: {review_name}\n"));
390 thread.push_str(&format!(
391 "Thread comment id: {}\nFile: {}\nLine: {}:{}\nStatus: {:?}\n",
392 comment.id,
393 comment.file_path,
394 comment
395 .old_line
396 .map(|value| value.to_string())
397 .unwrap_or_else(|| "_".to_string()),
398 comment
399 .new_line
400 .map(|value| value.to_string())
401 .unwrap_or_else(|| "_".to_string()),
402 comment.status
403 ));
404 thread.push_str("\nOriginal comment:\n");
405 thread.push_str(&comment.body);
406 thread.push_str("\n\nReplies so far:\n");
407 if comment.replies.is_empty() {
408 thread.push_str("- (none)\n");
409 } else {
410 for reply in &comment.replies {
411 let author = match reply.author {
412 Author::User => "user",
413 Author::Ai => "ai",
414 };
415 thread.push_str(&format!("- {}: {}\n", author, reply.body));
416 }
417 }
418 append_target_file_and_diff_context(&mut thread, comment, diff_document);
419 append_referenced_files_context(&mut thread, comment);
420
421 match mode {
422 AiSessionMode::Reply => {
423 thread.push_str(prompt_template("reply_task.md"));
424 }
425 AiSessionMode::Refactor => {
426 thread.push_str(prompt_template("refactor_task.md"));
427 }
428 }
429 thread
430}
431
432fn append_target_file_and_diff_context(
433 prompt: &mut String,
434 comment: &LineComment,
435 diff_document: Option<&DiffDocument>,
436) {
437 prompt.push_str("\n\nPrimary target context:\n");
438 let target_line = comment.new_line.or(comment.old_line);
439 match target_line {
440 Some(line) => {
441 prompt.push_str(&format!(
442 "- thread anchor: {}:{}\n",
443 comment.file_path, line
444 ));
445 if let Some(resolved) = resolve_workspace_path(&comment.file_path) {
446 if let Some(snippet) = file_line_snippet(&resolved, line) {
447 prompt.push_str(&format!(
448 " file snippet around {}:{}:\n{}",
449 comment.file_path, line, snippet
450 ));
451 } else {
452 prompt.push_str(" file snippet: unavailable for requested line\n");
453 }
454 } else {
455 prompt.push_str(" file snippet: file not found in workspace\n");
456 }
457 }
458 None => {
459 prompt.push_str(&format!(
460 "- thread anchor: {} (line unavailable)\n",
461 comment.file_path
462 ));
463 }
464 }
465
466 if let Some(document) = diff_document {
467 if let Some(file) = find_diff_file(document, &comment.file_path) {
468 if let Some(hunk) = choose_best_hunk(file, comment.old_line, comment.new_line) {
469 let excerpt = format_hunk_excerpt(hunk, comment.old_line, comment.new_line, 28);
470 prompt.push_str(" nearest diff hunk:\n");
471 prompt.push_str(&excerpt);
472 } else {
473 prompt.push_str(" nearest diff hunk: none for this file\n");
474 }
475 } else {
476 prompt.push_str(" nearest diff hunk: file not present in current git diff\n");
477 }
478 } else {
479 prompt.push_str(" nearest diff hunk: unavailable (failed to load git diff)\n");
480 }
481}
482
483fn append_referenced_files_context(
484 prompt: &mut String,
485 comment: &crate::domain::review::LineComment,
486) {
487 let mut ordered = BTreeSet::new();
488 for reference in parse_file_references(&comment.body) {
489 ordered.insert((reference.path, reference.line));
490 }
491 for reply in &comment.replies {
492 for reference in parse_file_references(&reply.body) {
493 ordered.insert((reference.path, reference.line));
494 }
495 }
496 if ordered.is_empty() {
497 return;
498 }
499
500 prompt.push_str("\n\nReferenced files from thread mentions:\n");
501 for (path, line) in ordered.into_iter().take(8) {
502 let marker = if let Some(value) = line {
503 format!("{path}:{value}")
504 } else {
505 path.clone()
506 };
507 prompt.push_str(&format!("- {marker}\n"));
508 if let (Some(value), Some(resolved)) = (line, resolve_workspace_path(&path))
509 && let Some(snippet) = file_line_snippet(&resolved, value)
510 {
511 prompt.push_str(&format!(" context from {}:\n", resolved.display()));
512 prompt.push_str(&snippet);
513 }
514 }
515}
516
517fn find_diff_file<'a>(document: &'a DiffDocument, path: &str) -> Option<&'a DiffFile> {
518 document.files.iter().find(|file| file.path == path)
519}
520
521fn choose_best_hunk(
522 file: &DiffFile,
523 old_line: Option<u32>,
524 new_line: Option<u32>,
525) -> Option<&DiffHunk> {
526 if file.hunks.is_empty() {
527 return None;
528 }
529
530 for hunk in &file.hunks {
531 if hunk_contains_anchor(hunk, old_line, new_line) {
532 return Some(hunk);
533 }
534 }
535
536 let mut scored = file
537 .hunks
538 .iter()
539 .map(|hunk| (hunk_distance_to_anchor(hunk, old_line, new_line), hunk))
540 .collect::<Vec<_>>();
541 scored.sort_by_key(|(distance, _)| *distance);
542 scored.first().map(|(_, hunk)| *hunk)
543}
544
545fn hunk_contains_anchor(hunk: &DiffHunk, old_line: Option<u32>, new_line: Option<u32>) -> bool {
546 hunk.lines.iter().any(|line| {
547 old_line.is_some() && line.old_line == old_line
548 || new_line.is_some() && line.new_line == new_line
549 })
550}
551
552fn hunk_distance_to_anchor(hunk: &DiffHunk, old_line: Option<u32>, new_line: Option<u32>) -> u32 {
553 let mut best = u32::MAX;
554 if let Some(target_old) = old_line {
555 best = best.min(line_distance(hunk.old_start, target_old));
556 }
557 if let Some(target_new) = new_line {
558 best = best.min(line_distance(hunk.new_start, target_new));
559 }
560 if best == u32::MAX { 0 } else { best }
561}
562
563fn line_distance(base: u32, target: u32) -> u32 {
564 base.abs_diff(target)
565}
566
567fn format_hunk_excerpt(
568 hunk: &DiffHunk,
569 old_line: Option<u32>,
570 new_line: Option<u32>,
571 max_lines: usize,
572) -> String {
573 if hunk.lines.is_empty() || max_lines == 0 {
574 return String::new();
575 }
576 let center = hunk
577 .lines
578 .iter()
579 .position(|line| {
580 old_line.is_some() && line.old_line == old_line
581 || new_line.is_some() && line.new_line == new_line
582 })
583 .unwrap_or(0);
584 let half_window = max_lines / 2;
585 let mut start = center.saturating_sub(half_window);
586 let end = (start + max_lines).min(hunk.lines.len());
587 if end - start < max_lines && end == hunk.lines.len() {
588 start = end.saturating_sub(max_lines);
589 }
590
591 let mut out = String::new();
592 for line in &hunk.lines[start..end] {
593 out.push_str(" ");
594 out.push_str(&line.raw);
595 out.push('\n');
596 }
597 out
598}
599
600fn resolve_workspace_path(path: &str) -> Option<PathBuf> {
601 let trimmed = path.trim();
602 if trimmed.is_empty() {
603 return None;
604 }
605
606 let candidate = if Path::new(trimmed).is_absolute() {
607 PathBuf::from(trimmed)
608 } else {
609 std::env::current_dir().ok()?.join(trimmed)
610 };
611 if !candidate.is_file() {
612 return None;
613 }
614 Some(candidate)
615}
616
617fn file_line_snippet(path: &Path, line: u32) -> Option<String> {
618 if line == 0 {
619 return None;
620 }
621 let text = std::fs::read_to_string(path).ok()?;
622 let lines: Vec<&str> = text.lines().collect();
623 let target = usize::try_from(line.saturating_sub(1)).ok()?;
624 if target >= lines.len() {
625 return None;
626 }
627
628 let start = target.saturating_sub(2);
629 let end = (target + 3).min(lines.len());
630 let mut out = String::new();
631 for (idx, content) in lines[start..end].iter().enumerate() {
632 let absolute = start + idx + 1;
633 out.push_str(&format!(" {absolute:>5} | {content}\n"));
634 }
635 Some(out)
636}
637
638fn prompt_template(path: &str) -> &'static str {
639 AI_SESSION_PROMPTS_DIR
640 .get_file(path)
641 .unwrap_or_else(|| panic!("missing ai session prompt template: {path}"))
642 .contents_utf8()
643 .unwrap_or_else(|| panic!("invalid utf-8 in ai session prompt template: {path}"))
644}
645
646fn missing_comment_prompt(review_name: &str, comment_id: u64) -> String {
647 prompt_template("comment_not_found.md")
648 .replace("{review_name}", review_name)
649 .replace("{comment_id}", &comment_id.to_string())
650}
651
652async fn invoke_provider(
653 config: &AppConfig,
654 provider: AiProvider,
655 mode: AiSessionMode,
656 prompt: &str,
657 progress_sender: Option<mpsc::Sender<AiProgressEvent>>,
658) -> Result<ProviderInvocation> {
659 let provider_cfg = config.ai.provider_config(provider);
660 if provider_cfg.client.trim().is_empty() {
661 return Err(anyhow!(
662 "provider {} has no configured client in config.toml",
663 provider.as_str()
664 ));
665 }
666
667 let mut command = Command::new(&provider_cfg.client);
668 command.kill_on_drop(true);
669 let args = normalized_provider_args(provider, provider_cfg, mode);
670 command.args(&args);
671 let codex_output_path = codex_output_path(provider)?;
672 if let Some(path) = codex_output_path.as_ref() {
673 if !args.iter().any(|arg| arg == "--json") {
674 command.arg("--json");
675 }
676 command.arg("--output-last-message");
677 command.arg(path);
678 }
679 let configured_model = provider_cfg
680 .model
681 .as_deref()
682 .map(str::trim)
683 .filter(|value| !value.is_empty())
684 .map(str::to_string);
685 if let Some(model_value) = configured_model.as_deref() {
686 match provider_cfg.model_arg.as_deref().map(str::trim) {
687 Some(model_arg) if !model_arg.is_empty() => {
688 command.arg(model_arg);
689 command.arg(model_value);
690 }
691 _ => {
692 command.arg(model_value);
693 }
694 }
695 }
696 command.stdout(Stdio::piped()).stderr(Stdio::piped());
697
698 let prompt_transport = normalized_prompt_transport(provider, &provider_cfg.prompt_transport);
699 match prompt_transport {
700 PromptTransport::Stdin => {
701 command.stdin(Stdio::piped());
702 }
703 PromptTransport::Argv => {
704 command.arg(prompt);
705 command.stdin(Stdio::null());
706 }
707 }
708
709 let mut child = command
710 .spawn()
711 .with_context(|| format!("failed to start provider client '{}'", provider_cfg.client))?;
712 debug!(
713 provider = %provider.as_str(),
714 client = %provider_cfg.client,
715 prompt_chars = prompt.chars().count(),
716 "provider process spawned"
717 );
718 emit_progress(
719 progress_sender.as_ref(),
720 provider,
721 "system",
722 format!(
723 "spawned {} (mode={}, transport={})",
724 provider_cfg.client,
725 mode.as_str(),
726 match prompt_transport {
727 PromptTransport::Stdin => "stdin",
728 PromptTransport::Argv => "argv",
729 }
730 ),
731 );
732
733 if matches!(prompt_transport, PromptTransport::Stdin)
734 && let Some(mut stdin) = child.stdin.take()
735 {
736 stdin
737 .write_all(prompt.as_bytes())
738 .await
739 .context("failed to send prompt to provider stdin")?;
740 stdin.flush().await.ok();
741 }
742
743 let stdout_task = child.stdout.take().map(|stdout| {
744 tokio::spawn(read_stream(
745 stdout,
746 provider,
747 "stdout",
748 progress_sender.clone(),
749 ))
750 });
751 let stderr_task = child.stderr.take().map(|stderr| {
752 tokio::spawn(read_stream(
753 stderr,
754 provider,
755 "stderr",
756 progress_sender.clone(),
757 ))
758 });
759
760 let timeout_seconds = effective_timeout_seconds(config, mode);
761 let wait_result = timeout(Duration::from_secs(timeout_seconds), child.wait()).await;
762 let mut timed_out = false;
763 let status = match wait_result {
764 Ok(Ok(status)) => Some(status),
765 Ok(Err(error)) => return Err(anyhow!("provider process wait failed: {error}")),
766 Err(_) => {
767 timed_out = true;
768 let _ = child.kill().await;
769 None
770 }
771 };
772
773 let stdout = collect_stream_output(stdout_task).await;
774 let stderr = collect_stream_output(stderr_task).await;
775 let stderr_trimmed = stderr.trim().to_string();
776 let maybe_codex_reply = read_codex_output_last_message(codex_output_path.as_deref()).await?;
777
778 if timed_out {
779 let reply = maybe_codex_reply
780 .as_deref()
781 .unwrap_or(stdout.trim())
782 .trim()
783 .to_string();
784 if !reply.is_empty() {
785 warn!(
786 provider = %provider.as_str(),
787 mode = %mode.as_str(),
788 timeout_seconds,
789 "provider timed out but returned partial output"
790 );
791 emit_progress(
792 progress_sender.as_ref(),
793 provider,
794 "system",
795 format!("timeout after {timeout_seconds}s, returning partial output"),
796 );
797 return Ok(ProviderInvocation {
798 reply,
799 model: detect_runtime_model(provider, &stdout, &stderr)
800 .or(configured_model.clone()),
801 });
802 }
803
804 emit_progress(
805 progress_sender.as_ref(),
806 provider,
807 "system",
808 format!("timeout after {timeout_seconds}s with no output"),
809 );
810 return Err(anyhow!(
811 "provider {} timed out after {}s{}",
812 provider.as_str(),
813 timeout_seconds,
814 if stderr_trimmed.is_empty() {
815 "".to_string()
816 } else {
817 format!(": {stderr_trimmed}")
818 }
819 ));
820 }
821 let status = status.expect("status is present when not timed out");
822
823 if !status.success() {
824 warn!(
825 provider = %provider.as_str(),
826 status = %status,
827 stderr = %stderr_trimmed,
828 "provider exited with non-zero status"
829 );
830 emit_progress(
831 progress_sender.as_ref(),
832 provider,
833 "system",
834 format!("provider exited with {status}: {stderr_trimmed}"),
835 );
836 return Err(anyhow!(
837 "provider exited with {}: {}",
838 status,
839 if stderr_trimmed.is_empty() {
840 "no stderr output".to_string()
841 } else {
842 stderr_trimmed
843 }
844 ));
845 }
846
847 let reply = maybe_codex_reply.unwrap_or_else(|| stdout.trim().to_string());
848 if reply.is_empty() {
849 warn!(provider = %provider.as_str(), "provider returned empty output");
850 emit_progress(
851 progress_sender.as_ref(),
852 provider,
853 "system",
854 "provider returned empty output",
855 );
856 return Err(anyhow!("provider returned empty output"));
857 }
858
859 emit_progress(
860 progress_sender.as_ref(),
861 provider,
862 "system",
863 "provider completed successfully",
864 );
865 Ok(ProviderInvocation {
866 reply,
867 model: detect_runtime_model(provider, &stdout, &stderr).or(configured_model),
868 })
869}
870
871fn format_ai_reply_body(model: Option<&str>, reply: &str) -> String {
872 let mut out = String::new();
873 if let Some(model) = model.map(str::trim).filter(|value| !value.is_empty()) {
874 out.push_str(&format!("Model: {model}\n\n"));
875 }
876 out.push_str(reply.trim_end());
877 out
878}
879
880fn detect_runtime_model(provider: AiProvider, stdout: &str, stderr: &str) -> Option<String> {
881 match provider {
882 AiProvider::Codex => detect_model_from_json_stream(stdout)
883 .or_else(|| detect_model_from_json_stream(stderr))
884 .or_else(|| detect_model_from_text(stdout))
885 .or_else(|| detect_model_from_text(stderr)),
886 AiProvider::Claude | AiProvider::Opencode => {
887 detect_model_from_text(stdout).or_else(|| detect_model_from_text(stderr))
888 }
889 }
890}
891
892fn detect_model_from_json_stream(stream: &str) -> Option<String> {
893 for line in stream.lines() {
894 let trimmed = line.trim();
895 if trimmed.is_empty() || !trimmed.starts_with('{') {
896 continue;
897 }
898 let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
899 continue;
900 };
901 if let Some(model) = extract_model_from_json(&value) {
902 return Some(model);
903 }
904 }
905 None
906}
907
908fn extract_model_from_json(value: &Value) -> Option<String> {
909 match value {
910 Value::Object(map) => {
911 for key in [
912 "model",
913 "model_id",
914 "model_slug",
915 "resolved_model",
916 "selected_model",
917 ] {
918 if let Some(Value::String(found)) = map.get(key) {
919 let trimmed = found.trim();
920 if !trimmed.is_empty() {
921 return Some(trimmed.to_string());
922 }
923 }
924 }
925 for nested in map.values() {
926 if let Some(found) = extract_model_from_json(nested) {
927 return Some(found);
928 }
929 }
930 None
931 }
932 Value::Array(items) => {
933 for item in items {
934 if let Some(found) = extract_model_from_json(item) {
935 return Some(found);
936 }
937 }
938 None
939 }
940 _ => None,
941 }
942}
943
944fn detect_model_from_text(text: &str) -> Option<String> {
945 for line in text.lines() {
946 if let Some(value) = extract_model_after_marker(line, "model:") {
947 return Some(value);
948 }
949 if let Some(value) = extract_model_after_marker(line, "model=") {
950 return Some(value);
951 }
952 }
953 None
954}
955
956fn extract_model_after_marker(line: &str, marker: &str) -> Option<String> {
957 let (_, right) = line.split_once(marker)?;
958 let candidate = right.split_whitespace().next().map(|value| {
959 value.trim_matches(|ch: char| ch == '"' || ch == '\'' || ch == ',' || ch == ';')
960 })?;
961 if candidate.is_empty() {
962 None
963 } else {
964 Some(candidate.to_string())
965 }
966}
967
968fn normalized_provider_args(
969 provider: AiProvider,
970 provider_cfg: &crate::domain::config::AiProviderConfig,
971 mode: AiSessionMode,
972) -> Vec<String> {
973 let mut args = provider_cfg.args.clone();
974 match provider {
975 AiProvider::Codex => {
976 if !args.first().map(|value| value == "exec").unwrap_or(false) {
977 args.insert(0, "exec".to_string());
978 }
979 if !args.iter().any(|arg| arg == "--full-auto") {
980 args.push("--full-auto".to_string());
981 }
982 let has_sandbox_flag = args.iter().any(|arg| arg == "--sandbox" || arg == "-s");
983 if !has_sandbox_flag {
984 args.push("--sandbox".to_string());
985 args.push(match mode {
986 AiSessionMode::Reply => "read-only".to_string(),
987 AiSessionMode::Refactor => "workspace-write".to_string(),
988 });
989 }
990 }
991 AiProvider::Claude => {
992 if !args.iter().any(|arg| arg == "-p" || arg == "--print") {
993 args.insert(0, "-p".to_string());
994 }
995 }
996 AiProvider::Opencode => {
997 if !args.first().map(|value| value == "run").unwrap_or(false) {
998 args.insert(0, "run".to_string());
999 }
1000 }
1001 }
1002 args
1003}
1004
1005fn codex_output_path(provider: AiProvider) -> Result<Option<std::path::PathBuf>> {
1006 if !matches!(provider, AiProvider::Codex) {
1007 return Ok(None);
1008 }
1009 let file = format!("parley-codex-last-{}-{}.txt", now_ms()?, std::process::id());
1010 Ok(Some(std::env::temp_dir().join(file)))
1011}
1012
1013async fn read_codex_output_last_message(path: Option<&std::path::Path>) -> Result<Option<String>> {
1014 let Some(path) = path else {
1015 return Ok(None);
1016 };
1017 let text = match fs::read_to_string(path).await {
1018 Ok(content) => content.trim().to_string(),
1019 Err(_) => String::new(),
1020 };
1021 let _ = fs::remove_file(path).await;
1022 if text.is_empty() {
1023 Ok(None)
1024 } else {
1025 Ok(Some(text))
1026 }
1027}
1028
1029async fn read_stream<R>(
1030 reader: R,
1031 provider: AiProvider,
1032 stream: &'static str,
1033 progress_sender: Option<mpsc::Sender<AiProgressEvent>>,
1034) -> String
1035where
1036 R: AsyncRead + Unpin + Send + 'static,
1037{
1038 let mut lines = BufReader::new(reader).lines();
1039 let mut out = String::new();
1040 while let Ok(Some(line)) = lines.next_line().await {
1041 info!(provider = %provider.as_str(), stream, payload = %line, "provider_stream");
1042 emit_progress(progress_sender.as_ref(), provider, stream, line.as_str());
1043 out.push_str(&line);
1044 out.push('\n');
1045 }
1046 out
1047}
1048
1049async fn collect_stream_output(task: Option<JoinHandle<String>>) -> String {
1050 let Some(task) = task else {
1051 return String::new();
1052 };
1053 match task.await {
1054 Ok(content) => content,
1055 Err(error) => format!("<stream task join failed: {error}>"),
1056 }
1057}
1058
1059fn normalized_prompt_transport(
1060 provider: AiProvider,
1061 configured: &PromptTransport,
1062) -> PromptTransport {
1063 let _ = configured;
1064 match provider {
1065 AiProvider::Codex | AiProvider::Claude | AiProvider::Opencode => PromptTransport::Argv,
1067 }
1068}
1069
1070fn emit_progress(
1071 progress_sender: Option<&mpsc::Sender<AiProgressEvent>>,
1072 provider: AiProvider,
1073 stream: &str,
1074 message: impl Into<String>,
1075) {
1076 let Some(progress_sender) = progress_sender else {
1077 return;
1078 };
1079 let timestamp_ms = std::time::SystemTime::now()
1080 .duration_since(std::time::UNIX_EPOCH)
1081 .map(|elapsed| elapsed.as_millis() as u64)
1082 .unwrap_or(0);
1083 let _ = progress_sender.send(AiProgressEvent {
1084 timestamp_ms,
1085 provider: provider.as_str().to_string(),
1086 stream: stream.to_string(),
1087 message: message.into(),
1088 });
1089}
1090
1091fn comment_is_targetable(status: CommentStatus, mode: AiSessionMode) -> bool {
1092 match mode {
1093 AiSessionMode::Reply => {
1094 matches!(status, CommentStatus::Open | CommentStatus::Pending)
1095 }
1096 AiSessionMode::Refactor => matches!(status, CommentStatus::Open),
1097 }
1098}
1099
1100fn effective_timeout_seconds(config: &AppConfig, mode: AiSessionMode) -> u64 {
1101 let configured = config.ai.timeout_seconds.max(1);
1102 match mode {
1103 AiSessionMode::Reply => configured,
1104 AiSessionMode::Refactor => configured.max(600),
1106 }
1107}
1108
1109fn now_ms() -> Result<u64> {
1110 let elapsed = std::time::SystemTime::now()
1111 .duration_since(std::time::UNIX_EPOCH)
1112 .context("system clock is before unix epoch")?;
1113 Ok(elapsed.as_millis() as u64)
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118 use super::{
1119 choose_best_hunk, comment_is_targetable, detect_model_from_json_stream,
1120 detect_model_from_text, format_ai_reply_body, format_hunk_excerpt, hunk_distance_to_anchor,
1121 };
1122 use crate::domain::ai::AiSessionMode;
1123 use crate::domain::diff::{DiffFile, DiffHunk, DiffLine, DiffLineKind};
1124 use crate::domain::review::CommentStatus;
1125
1126 #[test]
1127 fn reply_mode_excludes_addressed_threads() {
1128 assert!(comment_is_targetable(
1129 CommentStatus::Open,
1130 AiSessionMode::Reply
1131 ));
1132 assert!(comment_is_targetable(
1133 CommentStatus::Pending,
1134 AiSessionMode::Reply
1135 ));
1136 assert!(!comment_is_targetable(
1137 CommentStatus::Addressed,
1138 AiSessionMode::Reply
1139 ));
1140 }
1141
1142 #[test]
1143 fn refactor_mode_targets_only_open_threads() {
1144 assert!(comment_is_targetable(
1145 CommentStatus::Open,
1146 AiSessionMode::Refactor
1147 ));
1148 assert!(!comment_is_targetable(
1149 CommentStatus::Pending,
1150 AiSessionMode::Refactor
1151 ));
1152 assert!(!comment_is_targetable(
1153 CommentStatus::Addressed,
1154 AiSessionMode::Refactor
1155 ));
1156 }
1157
1158 #[test]
1159 fn choose_best_hunk_prefers_exact_anchor_match() {
1160 let file = DiffFile {
1161 path: "src/lib.rs".to_string(),
1162 header_lines: Vec::new(),
1163 hunks: vec![
1164 make_hunk(
1165 "@@ -1,3 +1,3 @@",
1166 1,
1167 1,
1168 vec![line_ctx(1, 1), line_ctx(2, 2)],
1169 ),
1170 make_hunk(
1171 "@@ -40,3 +40,3 @@",
1172 40,
1173 40,
1174 vec![line_ctx(40, 40), line_ctx(41, 41)],
1175 ),
1176 ],
1177 };
1178
1179 let chosen = choose_best_hunk(&file, None, Some(41)).expect("hunk should be selected");
1180 assert_eq!(chosen.new_start, 40);
1181 }
1182
1183 #[test]
1184 fn choose_best_hunk_falls_back_to_nearest_start() {
1185 let file = DiffFile {
1186 path: "src/lib.rs".to_string(),
1187 header_lines: Vec::new(),
1188 hunks: vec![
1189 make_hunk("@@ -10,2 +10,2 @@", 10, 10, vec![line_ctx(10, 10)]),
1190 make_hunk("@@ -80,2 +80,2 @@", 80, 80, vec![line_ctx(80, 80)]),
1191 ],
1192 };
1193
1194 let chosen = choose_best_hunk(&file, None, Some(74)).expect("hunk should be selected");
1195 assert_eq!(chosen.new_start, 80);
1196 assert!(hunk_distance_to_anchor(chosen, None, Some(74)) < 10);
1197 }
1198
1199 #[test]
1200 fn hunk_excerpt_contains_anchor_line() {
1201 let hunk = make_hunk(
1202 "@@ -20,4 +20,4 @@",
1203 20,
1204 20,
1205 vec![
1206 line_ctx(20, 20),
1207 line_add(0, 21, "+let value = 1;"),
1208 line_ctx(22, 22),
1209 ],
1210 );
1211 let excerpt = format_hunk_excerpt(&hunk, None, Some(21), 8);
1212 assert!(excerpt.contains("+let value = 1;"));
1213 assert!(excerpt.contains("@@ -20,4 +20,4 @@"));
1214 }
1215
1216 #[test]
1217 fn ai_reply_body_includes_model_header() {
1218 let body = format_ai_reply_body(Some("gpt-5.4"), "Implemented fix.");
1219 assert!(body.starts_with("Model: gpt-5.4"));
1220 assert!(body.contains("Implemented fix."));
1221 }
1222
1223 #[test]
1224 fn ai_reply_body_omits_header_when_model_unknown() {
1225 let body = format_ai_reply_body(None, "Implemented fix.");
1226 assert_eq!(body, "Implemented fix.");
1227 }
1228
1229 #[test]
1230 fn detect_model_from_json_stream_reads_nested_model_slug() {
1231 let stream = r#"{"event":"meta","payload":{"session":{"model_slug":"gpt-5.4"}}}"#;
1232 let detected = detect_model_from_json_stream(stream).expect("model should be detected");
1233 assert_eq!(detected, "gpt-5.4");
1234 }
1235
1236 #[test]
1237 fn detect_model_from_text_reads_model_marker() {
1238 let detected =
1239 detect_model_from_text("run complete; model=gpt-5.4; tokens=100").expect("model");
1240 assert_eq!(detected, "gpt-5.4");
1241 }
1242
1243 fn make_hunk(
1244 header: &str,
1245 old_start: u32,
1246 new_start: u32,
1247 mut extra: Vec<DiffLine>,
1248 ) -> DiffHunk {
1249 let mut lines = vec![DiffLine {
1250 kind: DiffLineKind::HunkHeader,
1251 old_line: None,
1252 new_line: None,
1253 raw: header.to_string(),
1254 code: header.to_string(),
1255 }];
1256 lines.append(&mut extra);
1257 DiffHunk {
1258 old_start,
1259 old_count: 1,
1260 new_start,
1261 new_count: 1,
1262 header: header.to_string(),
1263 lines,
1264 }
1265 }
1266
1267 fn line_ctx(old: u32, new: u32) -> DiffLine {
1268 DiffLine {
1269 kind: DiffLineKind::Context,
1270 old_line: Some(old),
1271 new_line: Some(new),
1272 raw: format!(" context {old}:{new}"),
1273 code: format!("context {old}:{new}"),
1274 }
1275 }
1276
1277 fn line_add(old: u32, new: u32, raw: &str) -> DiffLine {
1278 DiffLine {
1279 kind: DiffLineKind::Added,
1280 old_line: if old == 0 { None } else { Some(old) },
1281 new_line: Some(new),
1282 raw: raw.to_string(),
1283 code: raw.trim_start_matches('+').to_string(),
1284 }
1285 }
1286}