Skip to main content

parley/services/
ai_session.rs

1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4use std::sync::mpsc;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow};
8use include_dir::{Dir, include_dir};
9use serde::Serialize;
10use serde_json::Value;
11use tokio::fs;
12use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
13use tokio::process::Command;
14use tokio::task::JoinHandle;
15use tokio::time::timeout;
16use tracing::{debug, error, info, warn};
17
18use crate::domain::ai::{AiProvider, AiSessionMode};
19use crate::domain::config::{AppConfig, PromptTransport};
20use crate::domain::diff::{DiffDocument, DiffFile, DiffHunk};
21use crate::domain::reference::parse_file_references;
22use crate::domain::review::{Author, CommentStatus, LineComment, ReviewState};
23use crate::git::diff::{DiffSource, load_git_diff};
24use crate::services::review_service::{AddReplyInput, ReviewService};
25
26static AI_SESSION_PROMPTS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/prompts/ai_session");
27
28#[derive(Debug, Clone)]
29pub struct RunAiSessionInput {
30    pub review_name: String,
31    pub provider: AiProvider,
32    pub comment_ids: Vec<u64>,
33    pub mode: AiSessionMode,
34    pub diff_source: DiffSource,
35}
36
37#[derive(Debug, Clone, Serialize)]
38#[serde(rename_all = "snake_case")]
39pub struct AiSessionResult {
40    pub review_name: String,
41    pub provider: String,
42    pub mode: String,
43    pub client: String,
44    pub model: Option<String>,
45    pub session_id: String,
46    pub processed: usize,
47    pub skipped: usize,
48    pub failed: usize,
49    pub items: Vec<AiSessionItemResult>,
50}
51
52#[derive(Debug, Clone, Serialize)]
53#[serde(rename_all = "snake_case")]
54pub struct AiSessionItemResult {
55    pub comment_id: u64,
56    pub status: String,
57    pub message: String,
58}
59
60#[derive(Debug, Clone, Serialize)]
61#[serde(rename_all = "snake_case")]
62pub struct AiProgressEvent {
63    pub timestamp_ms: u64,
64    pub provider: String,
65    pub stream: String,
66    pub message: String,
67}
68
69#[derive(Debug, Clone)]
70struct ProviderInvocation {
71    reply: String,
72    model: Option<String>,
73}
74
75pub fn default_ai_session_mode(comment_ids: &[u64]) -> AiSessionMode {
76    if comment_ids.is_empty() {
77        AiSessionMode::Refactor
78    } else {
79        AiSessionMode::Reply
80    }
81}
82
83pub async fn run_ai_session(
84    service: &ReviewService,
85    input: RunAiSessionInput,
86) -> Result<AiSessionResult> {
87    run_ai_session_inner(service, input, None).await
88}
89
90pub async fn run_ai_session_with_progress(
91    service: &ReviewService,
92    input: RunAiSessionInput,
93    progress_sender: mpsc::Sender<AiProgressEvent>,
94) -> Result<AiSessionResult> {
95    run_ai_session_inner(service, input, Some(progress_sender)).await
96}
97
98async fn run_ai_session_inner(
99    service: &ReviewService,
100    input: RunAiSessionInput,
101    progress_sender: Option<mpsc::Sender<AiProgressEvent>>,
102) -> Result<AiSessionResult> {
103    info!(
104        review = %input.review_name,
105        provider = %input.provider.as_str(),
106        requested_comments = input.comment_ids.len(),
107        "starting ai session"
108    );
109    let config = service.load_config().await?;
110    let mut review = service.load_review(&input.review_name).await?;
111    let diff_document = match load_git_diff(&config, &input.diff_source).await {
112        Ok(document) => Some(document),
113        Err(error) => {
114            warn!(error = %error, "ai session prompt context: unable to load git diff");
115            None
116        }
117    };
118    let now_ms = now_ms()?;
119    let provider_cfg = config.ai.provider_config(input.provider);
120    let mut result = AiSessionResult {
121        review_name: input.review_name.clone(),
122        provider: input.provider.as_str().to_string(),
123        mode: input.mode.as_str().to_string(),
124        client: provider_cfg.client.clone(),
125        model: provider_cfg.model.clone(),
126        session_id: format!("{}-{}-{now_ms}", input.review_name, input.provider.as_str()),
127        processed: 0,
128        skipped: 0,
129        failed: 0,
130        items: Vec::new(),
131    };
132
133    if matches!(review.state, ReviewState::Done) {
134        warn!(
135            review = %input.review_name,
136            provider = %input.provider.as_str(),
137            "ai session skipped because review is done"
138        );
139        result.items.push(AiSessionItemResult {
140            comment_id: 0,
141            status: "skipped".to_string(),
142            message: "review is done; ai session ignored".to_string(),
143        });
144        result.skipped = 1;
145        return Ok(result);
146    }
147
148    let target_ids: Vec<u64> = if input.comment_ids.is_empty() {
149        review
150            .comments
151            .iter()
152            .filter(|comment| comment_is_targetable(comment.status.clone(), input.mode))
153            .map(|comment| comment.id)
154            .collect()
155    } else {
156        input.comment_ids.clone()
157    };
158    let total_targets = target_ids.len();
159    if total_targets == 0 {
160        result.items.push(AiSessionItemResult {
161            comment_id: 0,
162            status: "skipped".to_string(),
163            message: match input.mode {
164                AiSessionMode::Reply => "no replyable threads to process".to_string(),
165                AiSessionMode::Refactor => "no open threads to process".to_string(),
166            },
167        });
168        result.skipped = 1;
169        emit_progress(
170            progress_sender.as_ref(),
171            input.provider,
172            "system",
173            "no open threads to process",
174        );
175        return Ok(result);
176    }
177
178    let explicit_selection = !input.comment_ids.is_empty();
179    for (step_index, comment_id) in target_ids.into_iter().enumerate() {
180        emit_progress(
181            progress_sender.as_ref(),
182            input.provider,
183            "system",
184            format!(
185                "thread #{comment_id}: start ({}/{})",
186                step_index + 1,
187                total_targets
188            ),
189        );
190        debug!(
191            review = %input.review_name,
192            provider = %input.provider.as_str(),
193            comment_id,
194            "processing ai thread"
195        );
196        let maybe_comment = review
197            .comments
198            .iter()
199            .find(|comment| comment.id == comment_id);
200        let Some(comment) = maybe_comment else {
201            warn!(
202                review = %input.review_name,
203                provider = %input.provider.as_str(),
204                comment_id,
205                "ai session target comment not found"
206            );
207            result.failed += 1;
208            result.items.push(AiSessionItemResult {
209                comment_id,
210                status: "failed".to_string(),
211                message: "comment not found in review".to_string(),
212            });
213            emit_progress(
214                progress_sender.as_ref(),
215                input.provider,
216                "system",
217                format!("thread #{comment_id}: failed (comment not found)"),
218            );
219            continue;
220        };
221
222        let allow_selected_reply = explicit_selection && matches!(input.mode, AiSessionMode::Reply);
223        if !comment_is_targetable(comment.status.clone(), input.mode) && !allow_selected_reply {
224            debug!(
225                review = %input.review_name,
226                provider = %input.provider.as_str(),
227                comment_id,
228                status = ?comment.status,
229                "skipping non-targetable comment for selected mode"
230            );
231            result.skipped += 1;
232            result.items.push(AiSessionItemResult {
233                comment_id,
234                status: "skipped".to_string(),
235                message: format!(
236                    "comment status {:?} is not targetable for {} mode",
237                    comment.status,
238                    input.mode.as_str()
239                ),
240            });
241            emit_progress(
242                progress_sender.as_ref(),
243                input.provider,
244                "system",
245                format!(
246                    "thread #{comment_id}: skipped (status={:?})",
247                    comment.status
248                ),
249            );
250            continue;
251        }
252
253        let prompt = build_thread_prompt(
254            &input.review_name,
255            comment_id,
256            &review,
257            diff_document.as_ref(),
258            input.mode,
259        );
260        let provider_reply = match invoke_provider(
261            &config,
262            input.provider,
263            input.mode,
264            &prompt,
265            progress_sender.clone(),
266        )
267        .await
268        {
269            Ok(reply) => reply,
270            Err(error) => {
271                error!(
272                    review = %input.review_name,
273                    provider = %input.provider.as_str(),
274                    comment_id,
275                    error = %error,
276                    "provider invocation failed"
277                );
278                result.failed += 1;
279                result.items.push(AiSessionItemResult {
280                    comment_id,
281                    status: "failed".to_string(),
282                    message: format!("provider failed: {error}"),
283                });
284                emit_progress(
285                    progress_sender.as_ref(),
286                    input.provider,
287                    "system",
288                    format!("thread #{comment_id}: failed ({error})"),
289                );
290                continue;
291            }
292        };
293        let reply_body =
294            format_ai_reply_body(provider_reply.model.as_deref(), &provider_reply.reply);
295
296        let updated = match service
297            .add_reply(
298                &input.review_name,
299                AddReplyInput {
300                    comment_id,
301                    author: Author::Ai,
302                    body: reply_body,
303                },
304            )
305            .await
306        {
307            Ok(value) => value,
308            Err(error) => {
309                error!(
310                    review = %input.review_name,
311                    provider = %input.provider.as_str(),
312                    comment_id,
313                    error = %error,
314                    "failed to persist ai reply"
315                );
316                result.failed += 1;
317                result.items.push(AiSessionItemResult {
318                    comment_id,
319                    status: "failed".to_string(),
320                    message: format!("failed to persist ai reply: {error}"),
321                });
322                emit_progress(
323                    progress_sender.as_ref(),
324                    input.provider,
325                    "system",
326                    format!("thread #{comment_id}: failed (persist reply: {error})"),
327                );
328                continue;
329            }
330        };
331
332        review = updated;
333        result.processed += 1;
334        info!(
335            review = %input.review_name,
336            provider = %input.provider.as_str(),
337            comment_id,
338            "ai reply persisted"
339        );
340        result.items.push(AiSessionItemResult {
341            comment_id,
342            status: "processed".to_string(),
343            message: match input.mode {
344                AiSessionMode::Reply => "ai reply added".to_string(),
345                AiSessionMode::Refactor => {
346                    "ai reply added; thread status moved to pending_human".to_string()
347                }
348            },
349        });
350        emit_progress(
351            progress_sender.as_ref(),
352            input.provider,
353            "system",
354            format!(
355                "thread #{comment_id}: done ({}/{})",
356                step_index + 1,
357                total_targets
358            ),
359        );
360    }
361
362    info!(
363        review = %input.review_name,
364        provider = %input.provider.as_str(),
365        processed = result.processed,
366        skipped = result.skipped,
367        failed = result.failed,
368        "ai session completed"
369    );
370    Ok(result)
371}
372
373fn build_thread_prompt(
374    review_name: &str,
375    comment_id: u64,
376    review: &crate::domain::review::ReviewSession,
377    diff_document: Option<&DiffDocument>,
378    mode: AiSessionMode,
379) -> String {
380    let Some(comment) = review
381        .comments
382        .iter()
383        .find(|comment| comment.id == comment_id)
384    else {
385        return missing_comment_prompt(review_name, comment_id);
386    };
387
388    let mut thread = String::new();
389    thread.push_str(&format!("Review: {review_name}\n"));
390    thread.push_str(&format!(
391        "Thread comment id: {}\nFile: {}\nLine: {}:{}\nStatus: {:?}\n",
392        comment.id,
393        comment.file_path,
394        comment
395            .old_line
396            .map(|value| value.to_string())
397            .unwrap_or_else(|| "_".to_string()),
398        comment
399            .new_line
400            .map(|value| value.to_string())
401            .unwrap_or_else(|| "_".to_string()),
402        comment.status
403    ));
404    thread.push_str("\nOriginal comment:\n");
405    thread.push_str(&comment.body);
406    thread.push_str("\n\nReplies so far:\n");
407    if comment.replies.is_empty() {
408        thread.push_str("- (none)\n");
409    } else {
410        for reply in &comment.replies {
411            let author = match reply.author {
412                Author::User => "user",
413                Author::Ai => "ai",
414            };
415            thread.push_str(&format!("- {}: {}\n", author, reply.body));
416        }
417    }
418    append_target_file_and_diff_context(&mut thread, comment, diff_document);
419    append_referenced_files_context(&mut thread, comment);
420
421    match mode {
422        AiSessionMode::Reply => {
423            thread.push_str(prompt_template("reply_task.md"));
424        }
425        AiSessionMode::Refactor => {
426            thread.push_str(prompt_template("refactor_task.md"));
427        }
428    }
429    thread
430}
431
432fn append_target_file_and_diff_context(
433    prompt: &mut String,
434    comment: &LineComment,
435    diff_document: Option<&DiffDocument>,
436) {
437    prompt.push_str("\n\nPrimary target context:\n");
438    let target_line = comment.new_line.or(comment.old_line);
439    match target_line {
440        Some(line) => {
441            prompt.push_str(&format!(
442                "- thread anchor: {}:{}\n",
443                comment.file_path, line
444            ));
445            if let Some(resolved) = resolve_workspace_path(&comment.file_path) {
446                if let Some(snippet) = file_line_snippet(&resolved, line) {
447                    prompt.push_str(&format!(
448                        "  file snippet around {}:{}:\n{}",
449                        comment.file_path, line, snippet
450                    ));
451                } else {
452                    prompt.push_str("  file snippet: unavailable for requested line\n");
453                }
454            } else {
455                prompt.push_str("  file snippet: file not found in workspace\n");
456            }
457        }
458        None => {
459            prompt.push_str(&format!(
460                "- thread anchor: {} (line unavailable)\n",
461                comment.file_path
462            ));
463        }
464    }
465
466    if let Some(document) = diff_document {
467        if let Some(file) = find_diff_file(document, &comment.file_path) {
468            if let Some(hunk) = choose_best_hunk(file, comment.old_line, comment.new_line) {
469                let excerpt = format_hunk_excerpt(hunk, comment.old_line, comment.new_line, 28);
470                prompt.push_str("  nearest diff hunk:\n");
471                prompt.push_str(&excerpt);
472            } else {
473                prompt.push_str("  nearest diff hunk: none for this file\n");
474            }
475        } else {
476            prompt.push_str("  nearest diff hunk: file not present in current git diff\n");
477        }
478    } else {
479        prompt.push_str("  nearest diff hunk: unavailable (failed to load git diff)\n");
480    }
481}
482
483fn append_referenced_files_context(
484    prompt: &mut String,
485    comment: &crate::domain::review::LineComment,
486) {
487    let mut ordered = BTreeSet::new();
488    for reference in parse_file_references(&comment.body) {
489        ordered.insert((reference.path, reference.line));
490    }
491    for reply in &comment.replies {
492        for reference in parse_file_references(&reply.body) {
493            ordered.insert((reference.path, reference.line));
494        }
495    }
496    if ordered.is_empty() {
497        return;
498    }
499
500    prompt.push_str("\n\nReferenced files from thread mentions:\n");
501    for (path, line) in ordered.into_iter().take(8) {
502        let marker = if let Some(value) = line {
503            format!("{path}:{value}")
504        } else {
505            path.clone()
506        };
507        prompt.push_str(&format!("- {marker}\n"));
508        if let (Some(value), Some(resolved)) = (line, resolve_workspace_path(&path))
509            && let Some(snippet) = file_line_snippet(&resolved, value)
510        {
511            prompt.push_str(&format!("  context from {}:\n", resolved.display()));
512            prompt.push_str(&snippet);
513        }
514    }
515}
516
517fn find_diff_file<'a>(document: &'a DiffDocument, path: &str) -> Option<&'a DiffFile> {
518    document.files.iter().find(|file| file.path == path)
519}
520
521fn choose_best_hunk(
522    file: &DiffFile,
523    old_line: Option<u32>,
524    new_line: Option<u32>,
525) -> Option<&DiffHunk> {
526    if file.hunks.is_empty() {
527        return None;
528    }
529
530    for hunk in &file.hunks {
531        if hunk_contains_anchor(hunk, old_line, new_line) {
532            return Some(hunk);
533        }
534    }
535
536    let mut scored = file
537        .hunks
538        .iter()
539        .map(|hunk| (hunk_distance_to_anchor(hunk, old_line, new_line), hunk))
540        .collect::<Vec<_>>();
541    scored.sort_by_key(|(distance, _)| *distance);
542    scored.first().map(|(_, hunk)| *hunk)
543}
544
545fn hunk_contains_anchor(hunk: &DiffHunk, old_line: Option<u32>, new_line: Option<u32>) -> bool {
546    hunk.lines.iter().any(|line| {
547        old_line.is_some() && line.old_line == old_line
548            || new_line.is_some() && line.new_line == new_line
549    })
550}
551
552fn hunk_distance_to_anchor(hunk: &DiffHunk, old_line: Option<u32>, new_line: Option<u32>) -> u32 {
553    let mut best = u32::MAX;
554    if let Some(target_old) = old_line {
555        best = best.min(line_distance(hunk.old_start, target_old));
556    }
557    if let Some(target_new) = new_line {
558        best = best.min(line_distance(hunk.new_start, target_new));
559    }
560    if best == u32::MAX { 0 } else { best }
561}
562
563fn line_distance(base: u32, target: u32) -> u32 {
564    base.abs_diff(target)
565}
566
567fn format_hunk_excerpt(
568    hunk: &DiffHunk,
569    old_line: Option<u32>,
570    new_line: Option<u32>,
571    max_lines: usize,
572) -> String {
573    if hunk.lines.is_empty() || max_lines == 0 {
574        return String::new();
575    }
576    let center = hunk
577        .lines
578        .iter()
579        .position(|line| {
580            old_line.is_some() && line.old_line == old_line
581                || new_line.is_some() && line.new_line == new_line
582        })
583        .unwrap_or(0);
584    let half_window = max_lines / 2;
585    let mut start = center.saturating_sub(half_window);
586    let end = (start + max_lines).min(hunk.lines.len());
587    if end - start < max_lines && end == hunk.lines.len() {
588        start = end.saturating_sub(max_lines);
589    }
590
591    let mut out = String::new();
592    for line in &hunk.lines[start..end] {
593        out.push_str("    ");
594        out.push_str(&line.raw);
595        out.push('\n');
596    }
597    out
598}
599
600fn resolve_workspace_path(path: &str) -> Option<PathBuf> {
601    let trimmed = path.trim();
602    if trimmed.is_empty() {
603        return None;
604    }
605
606    let candidate = if Path::new(trimmed).is_absolute() {
607        PathBuf::from(trimmed)
608    } else {
609        std::env::current_dir().ok()?.join(trimmed)
610    };
611    if !candidate.is_file() {
612        return None;
613    }
614    Some(candidate)
615}
616
617fn file_line_snippet(path: &Path, line: u32) -> Option<String> {
618    if line == 0 {
619        return None;
620    }
621    let text = std::fs::read_to_string(path).ok()?;
622    let lines: Vec<&str> = text.lines().collect();
623    let target = usize::try_from(line.saturating_sub(1)).ok()?;
624    if target >= lines.len() {
625        return None;
626    }
627
628    let start = target.saturating_sub(2);
629    let end = (target + 3).min(lines.len());
630    let mut out = String::new();
631    for (idx, content) in lines[start..end].iter().enumerate() {
632        let absolute = start + idx + 1;
633        out.push_str(&format!("    {absolute:>5} | {content}\n"));
634    }
635    Some(out)
636}
637
638fn prompt_template(path: &str) -> &'static str {
639    AI_SESSION_PROMPTS_DIR
640        .get_file(path)
641        .unwrap_or_else(|| panic!("missing ai session prompt template: {path}"))
642        .contents_utf8()
643        .unwrap_or_else(|| panic!("invalid utf-8 in ai session prompt template: {path}"))
644}
645
646fn missing_comment_prompt(review_name: &str, comment_id: u64) -> String {
647    prompt_template("comment_not_found.md")
648        .replace("{review_name}", review_name)
649        .replace("{comment_id}", &comment_id.to_string())
650}
651
652async fn invoke_provider(
653    config: &AppConfig,
654    provider: AiProvider,
655    mode: AiSessionMode,
656    prompt: &str,
657    progress_sender: Option<mpsc::Sender<AiProgressEvent>>,
658) -> Result<ProviderInvocation> {
659    let provider_cfg = config.ai.provider_config(provider);
660    if provider_cfg.client.trim().is_empty() {
661        return Err(anyhow!(
662            "provider {} has no configured client in config.toml",
663            provider.as_str()
664        ));
665    }
666
667    let mut command = Command::new(&provider_cfg.client);
668    command.kill_on_drop(true);
669    let args = normalized_provider_args(provider, provider_cfg, mode);
670    command.args(&args);
671    let codex_output_path = codex_output_path(provider)?;
672    if let Some(path) = codex_output_path.as_ref() {
673        if !args.iter().any(|arg| arg == "--json") {
674            command.arg("--json");
675        }
676        command.arg("--output-last-message");
677        command.arg(path);
678    }
679    let configured_model = provider_cfg
680        .model
681        .as_deref()
682        .map(str::trim)
683        .filter(|value| !value.is_empty())
684        .map(str::to_string);
685    if let Some(model_value) = configured_model.as_deref() {
686        match provider_cfg.model_arg.as_deref().map(str::trim) {
687            Some(model_arg) if !model_arg.is_empty() => {
688                command.arg(model_arg);
689                command.arg(model_value);
690            }
691            _ => {
692                command.arg(model_value);
693            }
694        }
695    }
696    command.stdout(Stdio::piped()).stderr(Stdio::piped());
697
698    let prompt_transport = normalized_prompt_transport(provider, &provider_cfg.prompt_transport);
699    match prompt_transport {
700        PromptTransport::Stdin => {
701            command.stdin(Stdio::piped());
702        }
703        PromptTransport::Argv => {
704            command.arg(prompt);
705            command.stdin(Stdio::null());
706        }
707    }
708
709    let mut child = command
710        .spawn()
711        .with_context(|| format!("failed to start provider client '{}'", provider_cfg.client))?;
712    debug!(
713        provider = %provider.as_str(),
714        client = %provider_cfg.client,
715        prompt_chars = prompt.chars().count(),
716        "provider process spawned"
717    );
718    emit_progress(
719        progress_sender.as_ref(),
720        provider,
721        "system",
722        format!(
723            "spawned {} (mode={}, transport={})",
724            provider_cfg.client,
725            mode.as_str(),
726            match prompt_transport {
727                PromptTransport::Stdin => "stdin",
728                PromptTransport::Argv => "argv",
729            }
730        ),
731    );
732
733    if matches!(prompt_transport, PromptTransport::Stdin)
734        && let Some(mut stdin) = child.stdin.take()
735    {
736        stdin
737            .write_all(prompt.as_bytes())
738            .await
739            .context("failed to send prompt to provider stdin")?;
740        stdin.flush().await.ok();
741    }
742
743    let stdout_task = child.stdout.take().map(|stdout| {
744        tokio::spawn(read_stream(
745            stdout,
746            provider,
747            "stdout",
748            progress_sender.clone(),
749        ))
750    });
751    let stderr_task = child.stderr.take().map(|stderr| {
752        tokio::spawn(read_stream(
753            stderr,
754            provider,
755            "stderr",
756            progress_sender.clone(),
757        ))
758    });
759
760    let timeout_seconds = effective_timeout_seconds(config, mode);
761    let wait_result = timeout(Duration::from_secs(timeout_seconds), child.wait()).await;
762    let mut timed_out = false;
763    let status = match wait_result {
764        Ok(Ok(status)) => Some(status),
765        Ok(Err(error)) => return Err(anyhow!("provider process wait failed: {error}")),
766        Err(_) => {
767            timed_out = true;
768            let _ = child.kill().await;
769            None
770        }
771    };
772
773    let stdout = collect_stream_output(stdout_task).await;
774    let stderr = collect_stream_output(stderr_task).await;
775    let stderr_trimmed = stderr.trim().to_string();
776    let maybe_codex_reply = read_codex_output_last_message(codex_output_path.as_deref()).await?;
777
778    if timed_out {
779        let reply = maybe_codex_reply
780            .as_deref()
781            .unwrap_or(stdout.trim())
782            .trim()
783            .to_string();
784        if !reply.is_empty() {
785            warn!(
786                provider = %provider.as_str(),
787                mode = %mode.as_str(),
788                timeout_seconds,
789                "provider timed out but returned partial output"
790            );
791            emit_progress(
792                progress_sender.as_ref(),
793                provider,
794                "system",
795                format!("timeout after {timeout_seconds}s, returning partial output"),
796            );
797            return Ok(ProviderInvocation {
798                reply,
799                model: detect_runtime_model(provider, &stdout, &stderr)
800                    .or(configured_model.clone()),
801            });
802        }
803
804        emit_progress(
805            progress_sender.as_ref(),
806            provider,
807            "system",
808            format!("timeout after {timeout_seconds}s with no output"),
809        );
810        return Err(anyhow!(
811            "provider {} timed out after {}s{}",
812            provider.as_str(),
813            timeout_seconds,
814            if stderr_trimmed.is_empty() {
815                "".to_string()
816            } else {
817                format!(": {stderr_trimmed}")
818            }
819        ));
820    }
821    let status = status.expect("status is present when not timed out");
822
823    if !status.success() {
824        warn!(
825            provider = %provider.as_str(),
826            status = %status,
827            stderr = %stderr_trimmed,
828            "provider exited with non-zero status"
829        );
830        emit_progress(
831            progress_sender.as_ref(),
832            provider,
833            "system",
834            format!("provider exited with {status}: {stderr_trimmed}"),
835        );
836        return Err(anyhow!(
837            "provider exited with {}: {}",
838            status,
839            if stderr_trimmed.is_empty() {
840                "no stderr output".to_string()
841            } else {
842                stderr_trimmed
843            }
844        ));
845    }
846
847    let reply = maybe_codex_reply.unwrap_or_else(|| stdout.trim().to_string());
848    if reply.is_empty() {
849        warn!(provider = %provider.as_str(), "provider returned empty output");
850        emit_progress(
851            progress_sender.as_ref(),
852            provider,
853            "system",
854            "provider returned empty output",
855        );
856        return Err(anyhow!("provider returned empty output"));
857    }
858
859    emit_progress(
860        progress_sender.as_ref(),
861        provider,
862        "system",
863        "provider completed successfully",
864    );
865    Ok(ProviderInvocation {
866        reply,
867        model: detect_runtime_model(provider, &stdout, &stderr).or(configured_model),
868    })
869}
870
871fn format_ai_reply_body(model: Option<&str>, reply: &str) -> String {
872    let mut out = String::new();
873    if let Some(model) = model.map(str::trim).filter(|value| !value.is_empty()) {
874        out.push_str(&format!("Model: {model}\n\n"));
875    }
876    out.push_str(reply.trim_end());
877    out
878}
879
880fn detect_runtime_model(provider: AiProvider, stdout: &str, stderr: &str) -> Option<String> {
881    match provider {
882        AiProvider::Codex => detect_model_from_json_stream(stdout)
883            .or_else(|| detect_model_from_json_stream(stderr))
884            .or_else(|| detect_model_from_text(stdout))
885            .or_else(|| detect_model_from_text(stderr)),
886        AiProvider::Claude | AiProvider::Opencode => {
887            detect_model_from_text(stdout).or_else(|| detect_model_from_text(stderr))
888        }
889    }
890}
891
892fn detect_model_from_json_stream(stream: &str) -> Option<String> {
893    for line in stream.lines() {
894        let trimmed = line.trim();
895        if trimmed.is_empty() || !trimmed.starts_with('{') {
896            continue;
897        }
898        let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
899            continue;
900        };
901        if let Some(model) = extract_model_from_json(&value) {
902            return Some(model);
903        }
904    }
905    None
906}
907
908fn extract_model_from_json(value: &Value) -> Option<String> {
909    match value {
910        Value::Object(map) => {
911            for key in [
912                "model",
913                "model_id",
914                "model_slug",
915                "resolved_model",
916                "selected_model",
917            ] {
918                if let Some(Value::String(found)) = map.get(key) {
919                    let trimmed = found.trim();
920                    if !trimmed.is_empty() {
921                        return Some(trimmed.to_string());
922                    }
923                }
924            }
925            for nested in map.values() {
926                if let Some(found) = extract_model_from_json(nested) {
927                    return Some(found);
928                }
929            }
930            None
931        }
932        Value::Array(items) => {
933            for item in items {
934                if let Some(found) = extract_model_from_json(item) {
935                    return Some(found);
936                }
937            }
938            None
939        }
940        _ => None,
941    }
942}
943
944fn detect_model_from_text(text: &str) -> Option<String> {
945    for line in text.lines() {
946        if let Some(value) = extract_model_after_marker(line, "model:") {
947            return Some(value);
948        }
949        if let Some(value) = extract_model_after_marker(line, "model=") {
950            return Some(value);
951        }
952    }
953    None
954}
955
956fn extract_model_after_marker(line: &str, marker: &str) -> Option<String> {
957    let (_, right) = line.split_once(marker)?;
958    let candidate = right.split_whitespace().next().map(|value| {
959        value.trim_matches(|ch: char| ch == '"' || ch == '\'' || ch == ',' || ch == ';')
960    })?;
961    if candidate.is_empty() {
962        None
963    } else {
964        Some(candidate.to_string())
965    }
966}
967
968fn normalized_provider_args(
969    provider: AiProvider,
970    provider_cfg: &crate::domain::config::AiProviderConfig,
971    mode: AiSessionMode,
972) -> Vec<String> {
973    let mut args = provider_cfg.args.clone();
974    match provider {
975        AiProvider::Codex => {
976            if !args.first().map(|value| value == "exec").unwrap_or(false) {
977                args.insert(0, "exec".to_string());
978            }
979            if !args.iter().any(|arg| arg == "--full-auto") {
980                args.push("--full-auto".to_string());
981            }
982            let has_sandbox_flag = args.iter().any(|arg| arg == "--sandbox" || arg == "-s");
983            if !has_sandbox_flag {
984                args.push("--sandbox".to_string());
985                args.push(match mode {
986                    AiSessionMode::Reply => "read-only".to_string(),
987                    AiSessionMode::Refactor => "workspace-write".to_string(),
988                });
989            }
990        }
991        AiProvider::Claude => {
992            if !args.iter().any(|arg| arg == "-p" || arg == "--print") {
993                args.insert(0, "-p".to_string());
994            }
995        }
996        AiProvider::Opencode => {
997            if !args.first().map(|value| value == "run").unwrap_or(false) {
998                args.insert(0, "run".to_string());
999            }
1000        }
1001    }
1002    args
1003}
1004
1005fn codex_output_path(provider: AiProvider) -> Result<Option<std::path::PathBuf>> {
1006    if !matches!(provider, AiProvider::Codex) {
1007        return Ok(None);
1008    }
1009    let file = format!("parley-codex-last-{}-{}.txt", now_ms()?, std::process::id());
1010    Ok(Some(std::env::temp_dir().join(file)))
1011}
1012
1013async fn read_codex_output_last_message(path: Option<&std::path::Path>) -> Result<Option<String>> {
1014    let Some(path) = path else {
1015        return Ok(None);
1016    };
1017    let text = match fs::read_to_string(path).await {
1018        Ok(content) => content.trim().to_string(),
1019        Err(_) => String::new(),
1020    };
1021    let _ = fs::remove_file(path).await;
1022    if text.is_empty() {
1023        Ok(None)
1024    } else {
1025        Ok(Some(text))
1026    }
1027}
1028
1029async fn read_stream<R>(
1030    reader: R,
1031    provider: AiProvider,
1032    stream: &'static str,
1033    progress_sender: Option<mpsc::Sender<AiProgressEvent>>,
1034) -> String
1035where
1036    R: AsyncRead + Unpin + Send + 'static,
1037{
1038    let mut lines = BufReader::new(reader).lines();
1039    let mut out = String::new();
1040    while let Ok(Some(line)) = lines.next_line().await {
1041        info!(provider = %provider.as_str(), stream, payload = %line, "provider_stream");
1042        emit_progress(progress_sender.as_ref(), provider, stream, line.as_str());
1043        out.push_str(&line);
1044        out.push('\n');
1045    }
1046    out
1047}
1048
1049async fn collect_stream_output(task: Option<JoinHandle<String>>) -> String {
1050    let Some(task) = task else {
1051        return String::new();
1052    };
1053    match task.await {
1054        Ok(content) => content,
1055        Err(error) => format!("<stream task join failed: {error}>"),
1056    }
1057}
1058
1059fn normalized_prompt_transport(
1060    provider: AiProvider,
1061    configured: &PromptTransport,
1062) -> PromptTransport {
1063    let _ = configured;
1064    match provider {
1065        // Prefer explicit prompt argv for deterministic headless execution.
1066        AiProvider::Codex | AiProvider::Claude | AiProvider::Opencode => PromptTransport::Argv,
1067    }
1068}
1069
1070fn emit_progress(
1071    progress_sender: Option<&mpsc::Sender<AiProgressEvent>>,
1072    provider: AiProvider,
1073    stream: &str,
1074    message: impl Into<String>,
1075) {
1076    let Some(progress_sender) = progress_sender else {
1077        return;
1078    };
1079    let timestamp_ms = std::time::SystemTime::now()
1080        .duration_since(std::time::UNIX_EPOCH)
1081        .map(|elapsed| elapsed.as_millis() as u64)
1082        .unwrap_or(0);
1083    let _ = progress_sender.send(AiProgressEvent {
1084        timestamp_ms,
1085        provider: provider.as_str().to_string(),
1086        stream: stream.to_string(),
1087        message: message.into(),
1088    });
1089}
1090
1091fn comment_is_targetable(status: CommentStatus, mode: AiSessionMode) -> bool {
1092    match mode {
1093        AiSessionMode::Reply => {
1094            matches!(status, CommentStatus::Open | CommentStatus::Pending)
1095        }
1096        AiSessionMode::Refactor => matches!(status, CommentStatus::Open),
1097    }
1098}
1099
1100fn effective_timeout_seconds(config: &AppConfig, mode: AiSessionMode) -> u64 {
1101    let configured = config.ai.timeout_seconds.max(1);
1102    match mode {
1103        AiSessionMode::Reply => configured,
1104        // Refactor mode can involve tool execution and file edits; keep a higher floor.
1105        AiSessionMode::Refactor => configured.max(600),
1106    }
1107}
1108
1109fn now_ms() -> Result<u64> {
1110    let elapsed = std::time::SystemTime::now()
1111        .duration_since(std::time::UNIX_EPOCH)
1112        .context("system clock is before unix epoch")?;
1113    Ok(elapsed.as_millis() as u64)
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118    use super::{
1119        choose_best_hunk, comment_is_targetable, detect_model_from_json_stream,
1120        detect_model_from_text, format_ai_reply_body, format_hunk_excerpt, hunk_distance_to_anchor,
1121    };
1122    use crate::domain::ai::AiSessionMode;
1123    use crate::domain::diff::{DiffFile, DiffHunk, DiffLine, DiffLineKind};
1124    use crate::domain::review::CommentStatus;
1125
1126    #[test]
1127    fn reply_mode_excludes_addressed_threads() {
1128        assert!(comment_is_targetable(
1129            CommentStatus::Open,
1130            AiSessionMode::Reply
1131        ));
1132        assert!(comment_is_targetable(
1133            CommentStatus::Pending,
1134            AiSessionMode::Reply
1135        ));
1136        assert!(!comment_is_targetable(
1137            CommentStatus::Addressed,
1138            AiSessionMode::Reply
1139        ));
1140    }
1141
1142    #[test]
1143    fn refactor_mode_targets_only_open_threads() {
1144        assert!(comment_is_targetable(
1145            CommentStatus::Open,
1146            AiSessionMode::Refactor
1147        ));
1148        assert!(!comment_is_targetable(
1149            CommentStatus::Pending,
1150            AiSessionMode::Refactor
1151        ));
1152        assert!(!comment_is_targetable(
1153            CommentStatus::Addressed,
1154            AiSessionMode::Refactor
1155        ));
1156    }
1157
1158    #[test]
1159    fn choose_best_hunk_prefers_exact_anchor_match() {
1160        let file = DiffFile {
1161            path: "src/lib.rs".to_string(),
1162            header_lines: Vec::new(),
1163            hunks: vec![
1164                make_hunk(
1165                    "@@ -1,3 +1,3 @@",
1166                    1,
1167                    1,
1168                    vec![line_ctx(1, 1), line_ctx(2, 2)],
1169                ),
1170                make_hunk(
1171                    "@@ -40,3 +40,3 @@",
1172                    40,
1173                    40,
1174                    vec![line_ctx(40, 40), line_ctx(41, 41)],
1175                ),
1176            ],
1177        };
1178
1179        let chosen = choose_best_hunk(&file, None, Some(41)).expect("hunk should be selected");
1180        assert_eq!(chosen.new_start, 40);
1181    }
1182
1183    #[test]
1184    fn choose_best_hunk_falls_back_to_nearest_start() {
1185        let file = DiffFile {
1186            path: "src/lib.rs".to_string(),
1187            header_lines: Vec::new(),
1188            hunks: vec![
1189                make_hunk("@@ -10,2 +10,2 @@", 10, 10, vec![line_ctx(10, 10)]),
1190                make_hunk("@@ -80,2 +80,2 @@", 80, 80, vec![line_ctx(80, 80)]),
1191            ],
1192        };
1193
1194        let chosen = choose_best_hunk(&file, None, Some(74)).expect("hunk should be selected");
1195        assert_eq!(chosen.new_start, 80);
1196        assert!(hunk_distance_to_anchor(chosen, None, Some(74)) < 10);
1197    }
1198
1199    #[test]
1200    fn hunk_excerpt_contains_anchor_line() {
1201        let hunk = make_hunk(
1202            "@@ -20,4 +20,4 @@",
1203            20,
1204            20,
1205            vec![
1206                line_ctx(20, 20),
1207                line_add(0, 21, "+let value = 1;"),
1208                line_ctx(22, 22),
1209            ],
1210        );
1211        let excerpt = format_hunk_excerpt(&hunk, None, Some(21), 8);
1212        assert!(excerpt.contains("+let value = 1;"));
1213        assert!(excerpt.contains("@@ -20,4 +20,4 @@"));
1214    }
1215
1216    #[test]
1217    fn ai_reply_body_includes_model_header() {
1218        let body = format_ai_reply_body(Some("gpt-5.4"), "Implemented fix.");
1219        assert!(body.starts_with("Model: gpt-5.4"));
1220        assert!(body.contains("Implemented fix."));
1221    }
1222
1223    #[test]
1224    fn ai_reply_body_omits_header_when_model_unknown() {
1225        let body = format_ai_reply_body(None, "Implemented fix.");
1226        assert_eq!(body, "Implemented fix.");
1227    }
1228
1229    #[test]
1230    fn detect_model_from_json_stream_reads_nested_model_slug() {
1231        let stream = r#"{"event":"meta","payload":{"session":{"model_slug":"gpt-5.4"}}}"#;
1232        let detected = detect_model_from_json_stream(stream).expect("model should be detected");
1233        assert_eq!(detected, "gpt-5.4");
1234    }
1235
1236    #[test]
1237    fn detect_model_from_text_reads_model_marker() {
1238        let detected =
1239            detect_model_from_text("run complete; model=gpt-5.4; tokens=100").expect("model");
1240        assert_eq!(detected, "gpt-5.4");
1241    }
1242
1243    fn make_hunk(
1244        header: &str,
1245        old_start: u32,
1246        new_start: u32,
1247        mut extra: Vec<DiffLine>,
1248    ) -> DiffHunk {
1249        let mut lines = vec![DiffLine {
1250            kind: DiffLineKind::HunkHeader,
1251            old_line: None,
1252            new_line: None,
1253            raw: header.to_string(),
1254            code: header.to_string(),
1255        }];
1256        lines.append(&mut extra);
1257        DiffHunk {
1258            old_start,
1259            old_count: 1,
1260            new_start,
1261            new_count: 1,
1262            header: header.to_string(),
1263            lines,
1264        }
1265    }
1266
1267    fn line_ctx(old: u32, new: u32) -> DiffLine {
1268        DiffLine {
1269            kind: DiffLineKind::Context,
1270            old_line: Some(old),
1271            new_line: Some(new),
1272            raw: format!(" context {old}:{new}"),
1273            code: format!("context {old}:{new}"),
1274        }
1275    }
1276
1277    fn line_add(old: u32, new: u32, raw: &str) -> DiffLine {
1278        DiffLine {
1279            kind: DiffLineKind::Added,
1280            old_line: if old == 0 { None } else { Some(old) },
1281            new_line: Some(new),
1282            raw: raw.to_string(),
1283            code: raw.trim_start_matches('+').to_string(),
1284        }
1285    }
1286}