Skip to main content

embacle/
copilot_headless.rs

1// ABOUTME: CopilotHeadlessRunner wraps the copilot CLI via ACP (Agent Client Protocol) for LLM completions.
2// ABOUTME: Spawns copilot --acp per request and communicates via NDJSON-framed JSON-RPC over stdio.
3//
4// SPDX-License-Identifier: Apache-2.0
5// Copyright (c) 2026 dravr.ai
6
7use std::path::PathBuf;
8
9use agent_client_protocol_schema as schema;
10use async_trait::async_trait;
11use serde_json::{json, Value};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio::process::{Child, ChildStdin, ChildStdout};
14use tokio::sync::mpsc;
15use tracing::{debug, warn};
16
17use crate::copilot::{copilot_fallback_models, discover_copilot_models};
18use crate::copilot_headless_config::{CopilotHeadlessConfig, PermissionPolicy};
19use crate::types::{
20    ChatRequest, ChatResponse, ChatStream, LlmCapabilities, LlmProvider, MessageRole, RunnerError,
21    StreamChunk, TokenUsage,
22};
23
24/// Maximum time to wait for an ACP prompt to complete (5 minutes).
25///
26/// Copilot headless sessions can involve multi-step tool calling, so this is
27/// more generous than the default CLI runner timeout (120s).
28const ACP_PROMPT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
29
30// ---------------------------------------------------------------------------
31// NDJSON transport
32// ---------------------------------------------------------------------------
33
34/// Async NDJSON transport for ACP JSON-RPC communication.
35///
36/// Handles reading/writing newline-delimited JSON messages over stdio pipes.
37/// Each message is a single JSON line terminated by `\n`.
38struct AcpTransport {
39    writer: BufWriter<ChildStdin>,
40    reader: BufReader<ChildStdout>,
41    next_id: i64,
42}
43
44impl AcpTransport {
45    fn new(stdin: ChildStdin, stdout: ChildStdout) -> Self {
46        Self {
47            writer: BufWriter::new(stdin),
48            reader: BufReader::new(stdout),
49            next_id: 1,
50        }
51    }
52
53    /// Send a JSON-RPC request and return its id.
54    async fn send_request(&mut self, method: &str, params: Value) -> Result<i64, RunnerError> {
55        let id = self.next_id;
56        self.next_id += 1;
57
58        let msg = json!({
59            "jsonrpc": "2.0",
60            "id": id,
61            "method": method,
62            "params": params,
63        });
64
65        self.write_message(&msg).await?;
66        Ok(id)
67    }
68
69    /// Send a JSON-RPC response (for server-to-client requests like permission).
70    async fn send_response(&mut self, id: &Value, result: Value) -> Result<(), RunnerError> {
71        let msg = json!({
72            "jsonrpc": "2.0",
73            "id": id,
74            "result": result,
75        });
76        self.write_message(&msg).await
77    }
78
79    /// Write a single NDJSON message.
80    async fn write_message(&mut self, msg: &Value) -> Result<(), RunnerError> {
81        let line = serde_json::to_string(msg)
82            .map_err(|e| RunnerError::internal(format!("JSON serialization failed: {e}")))?;
83        self.writer
84            .write_all(line.as_bytes())
85            .await
86            .map_err(|e| RunnerError::internal(format!("Write failed: {e}")))?;
87        self.writer
88            .write_all(b"\n")
89            .await
90            .map_err(|e| RunnerError::internal(format!("Write newline failed: {e}")))?;
91        self.writer
92            .flush()
93            .await
94            .map_err(|e| RunnerError::internal(format!("Flush failed: {e}")))?;
95        Ok(())
96    }
97
98    /// Read the next NDJSON message, skipping blank lines.
99    async fn read_message(&mut self) -> Result<Value, RunnerError> {
100        let mut line = String::new();
101        loop {
102            line.clear();
103            let n = self
104                .reader
105                .read_line(&mut line)
106                .await
107                .map_err(|e| RunnerError::internal(format!("Read failed: {e}")))?;
108            if n == 0 {
109                return Err(RunnerError::internal("ACP connection closed unexpectedly"));
110            }
111            let trimmed = line.trim();
112            if trimmed.is_empty() {
113                continue;
114            }
115            return serde_json::from_str(trimmed)
116                .map_err(|e| RunnerError::internal(format!("JSON parse failed: {e}")));
117        }
118    }
119
120    /// Read messages until we get the response matching the given request id.
121    ///
122    /// Non-matching messages (notifications, other responses) are skipped.
123    async fn read_response(&mut self, expected_id: i64) -> Result<Value, RunnerError> {
124        loop {
125            let msg = self.read_message().await?;
126            if msg.get("id").and_then(Value::as_i64) == Some(expected_id) {
127                if let Some(error) = msg.get("error") {
128                    return Err(RunnerError::external_service(
129                        "copilot-acp",
130                        format!("RPC error: {error}"),
131                    ));
132                }
133                return Ok(msg.get("result").cloned().unwrap_or(Value::Null));
134            }
135        }
136    }
137}
138
139// ---------------------------------------------------------------------------
140// ACP session lifecycle
141// ---------------------------------------------------------------------------
142
143/// Spawn the copilot --acp subprocess with piped stdio.
144fn spawn_copilot(cli_path: &PathBuf, github_token: Option<&str>) -> Result<Child, RunnerError> {
145    let mut cmd = tokio::process::Command::new(cli_path);
146    cmd.arg("--acp")
147        .stdin(std::process::Stdio::piped())
148        .stdout(std::process::Stdio::piped())
149        .stderr(std::process::Stdio::null());
150
151    if let Some(token) = github_token {
152        cmd.env("COPILOT_GITHUB_TOKEN", token);
153    }
154
155    cmd.spawn()
156        .map_err(|e| RunnerError::internal(format!("Failed to spawn copilot --acp: {e}")))
157}
158
159/// Initialize ACP connection and create a session.
160///
161/// Returns the transport and session id ready for prompting.
162async fn setup_session(
163    cli_path: &PathBuf,
164    github_token: Option<&str>,
165    model: &str,
166    system_prompt: Option<&str>,
167) -> Result<(AcpTransport, Child, String), RunnerError> {
168    let mut child = spawn_copilot(cli_path, github_token)?;
169
170    let stdin = child
171        .stdin
172        .take()
173        .ok_or_else(|| RunnerError::internal("Failed to capture copilot stdin"))?;
174    let stdout = child
175        .stdout
176        .take()
177        .ok_or_else(|| RunnerError::internal("Failed to capture copilot stdout"))?;
178
179    let mut transport = AcpTransport::new(stdin, stdout);
180
181    // Initialize handshake
182    let init_id = transport
183        .send_request(
184            "initialize",
185            json!({
186                "protocolVersion": 1,
187                "clientInfo": {
188                    "name": "embacle",
189                    "version": env!("CARGO_PKG_VERSION"),
190                },
191                "capabilities": {},
192            }),
193        )
194        .await?;
195    transport.read_response(init_id).await?;
196
197    // Create session with model and optional system prompt
198    let mut session_params = json!({
199        "model": model,
200        "cwd": std::env::current_dir()
201            .map_err(|e| RunnerError::internal(format!("Failed to get cwd: {e}")))?,
202        "mcpServers": [],
203    });
204    if let Some(sys) = system_prompt {
205        session_params["systemPrompt"] = Value::String(sys.to_owned());
206    }
207
208    let session_id_req = transport
209        .send_request("session/new", session_params)
210        .await?;
211    let session_result = transport.read_response(session_id_req).await?;
212
213    let session_id = session_result
214        .get("sessionId")
215        .and_then(Value::as_str)
216        .ok_or_else(|| {
217            RunnerError::external_service("copilot-acp", "Missing sessionId in response")
218        })?
219        .to_owned();
220
221    debug!(session_id = %session_id, model = %model, "ACP session created");
222    Ok((transport, child, session_id))
223}
224
225// ---------------------------------------------------------------------------
226// Notification and permission handling
227// ---------------------------------------------------------------------------
228
229/// Accumulated state from ACP session notifications during a prompt turn.
230struct TurnAccumulator {
231    content: String,
232    tool_calls: Vec<ObservedToolCall>,
233}
234
235impl TurnAccumulator {
236    const fn new() -> Self {
237        Self {
238            content: String::new(),
239            tool_calls: Vec::new(),
240        }
241    }
242}
243
244/// Process a session/update notification, accumulating content and tool calls.
245fn process_notification(params: &Value, acc: &mut TurnAccumulator) {
246    let Some(params) = params.get("params").or(Some(params)) else {
247        return;
248    };
249
250    let Ok(notif) = serde_json::from_value::<schema::SessionNotification>(params.clone()) else {
251        return;
252    };
253
254    match &notif.update {
255        schema::SessionUpdate::AgentMessageChunk(chunk) => {
256            if let schema::ContentBlock::Text(text) = &chunk.content {
257                acc.content.push_str(&text.text);
258            }
259        }
260        schema::SessionUpdate::ToolCall(tc) => {
261            acc.tool_calls.push(ObservedToolCall {
262                id: tc.tool_call_id.0.to_string(),
263                title: tc.title.clone(),
264                status: format!("{:?}", tc.status),
265            });
266        }
267        schema::SessionUpdate::ToolCallUpdate(update) => {
268            let update_id = update.tool_call_id.0.to_string();
269            if let Some(existing) = acc.tool_calls.iter_mut().find(|t| t.id == update_id) {
270                if let Some(ref title) = update.fields.title {
271                    existing.title.clone_from(title);
272                }
273                if let Some(ref status) = update.fields.status {
274                    existing.status = format!("{status:?}");
275                }
276            }
277        }
278        _ => {}
279    }
280}
281
282/// Build a permission response based on the configured policy.
283///
284/// With `AutoApprove`: selects `AllowAlways` over `AllowOnce`. If no allow option
285/// exists, cancels the request instead of falling back to a reject option.
286/// With `DenyAll`: always cancels the request.
287fn build_permission_response(params: &Value, policy: PermissionPolicy) -> Value {
288    if policy == PermissionPolicy::DenyAll {
289        debug!("Permission policy is DenyAll, cancelling");
290        return json!({ "outcome": "cancelled" });
291    }
292
293    let Ok(req) = serde_json::from_value::<schema::RequestPermissionRequest>(params.clone()) else {
294        warn!("Failed to parse permission request, cancelling");
295        return json!({ "outcome": "cancelled" });
296    };
297
298    // Prefer AllowAlways over AllowOnce for fewer repeated prompts
299    let option_id = req
300        .options
301        .iter()
302        .find(|o| matches!(o.kind, schema::PermissionOptionKind::AllowAlways))
303        .or_else(|| {
304            req.options
305                .iter()
306                .find(|o| matches!(o.kind, schema::PermissionOptionKind::AllowOnce))
307        })
308        .map(|o| &o.option_id);
309
310    option_id.map_or_else(
311        || {
312            warn!("Permission request had no allow options, cancelling");
313            json!({ "outcome": "cancelled" })
314        },
315        |id| {
316            debug!(?id, "Auto-approving permission request");
317            json!({ "outcome": { "optionId": id.0 } })
318        },
319    )
320}
321
322/// Extract token usage from the prompt response JSON.
323///
324/// ACP returns usage at `/result/usage` with camelCase fields:
325/// `totalTokens`, `inputTokens`, `outputTokens`.
326fn extract_usage(result: &Value) -> Option<TokenUsage> {
327    let usage = result
328        .pointer("/result/usage")
329        .or_else(|| result.get("usage"))?;
330
331    let input = usage.get("inputTokens").and_then(Value::as_u64)?;
332    let output = usage.get("outputTokens").and_then(Value::as_u64)?;
333    let total = usage
334        .get("totalTokens")
335        .and_then(Value::as_u64)
336        .unwrap_or(input + output);
337
338    #[allow(clippy::cast_possible_truncation)]
339    Some(TokenUsage {
340        prompt_tokens: input as u32,
341        completion_tokens: output as u32,
342        total_tokens: total as u32,
343    })
344}
345
346fn map_stop_reason(reason: &str) -> &'static str {
347    match reason {
348        "max_tokens" => "length",
349        "max_turn_requests" => "max_turns",
350        "refusal" => "refusal",
351        "cancelled" => "cancelled",
352        _ => "stop",
353    }
354}
355
356// ---------------------------------------------------------------------------
357// Message collection loops
358// ---------------------------------------------------------------------------
359
360/// Read messages until prompt completes, collecting all content.
361async fn collect_complete(
362    transport: &mut AcpTransport,
363    prompt_id: i64,
364    model: String,
365    policy: PermissionPolicy,
366) -> Result<(ChatResponse, Vec<ObservedToolCall>), RunnerError> {
367    let mut acc = TurnAccumulator::new();
368
369    loop {
370        let msg = transport.read_message().await?;
371
372        // Prompt response — the turn is complete
373        if msg.get("id").and_then(Value::as_i64) == Some(prompt_id) {
374            if let Some(error) = msg.get("error") {
375                return Err(RunnerError::external_service(
376                    "copilot-acp",
377                    format!("Prompt failed: {error}"),
378                ));
379            }
380
381            let stop_reason = msg
382                .pointer("/result/stopReason")
383                .and_then(Value::as_str)
384                .unwrap_or("end_turn");
385
386            let usage = extract_usage(&msg);
387
388            debug!(
389                content_len = acc.content.len(),
390                tool_calls = acc.tool_calls.len(),
391                model = %model,
392                has_usage = usage.is_some(),
393                "Copilot Headless complete() response"
394            );
395
396            let response = ChatResponse {
397                content: acc.content,
398                model,
399                usage,
400                finish_reason: Some(map_stop_reason(stop_reason).to_owned()),
401                warnings: None,
402                tool_calls: None,
403            };
404
405            return Ok((response, acc.tool_calls));
406        }
407
408        // Server requests and notifications
409        handle_server_message(&msg, transport, &mut acc, policy).await?;
410    }
411}
412
413/// Read messages until prompt completes, streaming chunks via channel.
414async fn collect_streaming(
415    transport: &mut AcpTransport,
416    prompt_id: i64,
417    chunk_tx: &mpsc::UnboundedSender<Result<StreamChunk, RunnerError>>,
418    policy: PermissionPolicy,
419) -> Result<(), RunnerError> {
420    let mut acc = TurnAccumulator::new();
421
422    loop {
423        let msg = transport.read_message().await?;
424
425        // Prompt response — the turn is complete
426        if msg.get("id").and_then(Value::as_i64) == Some(prompt_id) {
427            if let Some(error) = msg.get("error") {
428                return Err(RunnerError::external_service(
429                    "copilot-acp",
430                    format!("Prompt failed: {error}"),
431                ));
432            }
433
434            let stop_reason = msg
435                .pointer("/result/stopReason")
436                .and_then(Value::as_str)
437                .unwrap_or("end_turn");
438
439            let _ = chunk_tx.send(Ok(StreamChunk {
440                delta: String::new(),
441                is_final: true,
442                finish_reason: Some(map_stop_reason(stop_reason).to_owned()),
443            }));
444
445            return Ok(());
446        }
447
448        // Server requests and notifications
449        if let Some(method) = msg.get("method").and_then(Value::as_str) {
450            match method {
451                "session/update" => {
452                    if let Some(params) = msg.get("params") {
453                        // Try to extract text delta for streaming
454                        if let Ok(notif) =
455                            serde_json::from_value::<schema::SessionNotification>(params.clone())
456                        {
457                            if let schema::SessionUpdate::AgentMessageChunk(chunk) = &notif.update {
458                                if let schema::ContentBlock::Text(text) = &chunk.content {
459                                    let _ = chunk_tx.send(Ok(StreamChunk {
460                                        delta: text.text.clone(),
461                                        is_final: false,
462                                        finish_reason: None,
463                                    }));
464                                }
465                            }
466                        }
467                        // Also track tool calls for internal accounting
468                        process_notification(params, &mut acc);
469                    }
470                }
471                "session/request_permission" => {
472                    if let (Some(id), Some(params)) = (msg.get("id"), msg.get("params")) {
473                        let response = build_permission_response(params, policy);
474                        transport.send_response(id, response).await?;
475                    }
476                }
477                _ => {}
478            }
479        }
480    }
481}
482
483/// Handle a server-to-client message (notification or request).
484async fn handle_server_message(
485    msg: &Value,
486    transport: &mut AcpTransport,
487    acc: &mut TurnAccumulator,
488    policy: PermissionPolicy,
489) -> Result<(), RunnerError> {
490    if let Some(method) = msg.get("method").and_then(Value::as_str) {
491        match method {
492            "session/update" => {
493                if let Some(params) = msg.get("params") {
494                    process_notification(params, acc);
495                }
496            }
497            "session/request_permission" => {
498                if let (Some(id), Some(params)) = (msg.get("id"), msg.get("params")) {
499                    let response = build_permission_response(params, policy);
500                    transport.send_response(id, response).await?;
501                }
502            }
503            _ => {}
504        }
505    }
506    Ok(())
507}
508
509// ---------------------------------------------------------------------------
510// Public types
511// ---------------------------------------------------------------------------
512
513/// A tool call observed during an ACP session turn.
514#[derive(Debug, Clone)]
515pub struct ObservedToolCall {
516    /// Tool call ID from the ACP protocol.
517    pub id: String,
518    /// Human-readable title describing the tool action.
519    pub title: String,
520    /// Execution status (e.g., "Pending", "`InProgress`", "Completed", "Failed").
521    pub status: String,
522}
523
524/// Response from a headless conversation turn including tool execution metadata.
525#[derive(Debug, Clone)]
526pub struct HeadlessToolResponse {
527    /// Final assistant response content.
528    pub content: String,
529    /// Model that generated the response.
530    pub model: String,
531    /// Tool calls observed during the turn.
532    pub tool_calls: Vec<ObservedToolCall>,
533    /// Token usage for this turn.
534    pub usage: Option<TokenUsage>,
535    /// Finish reason.
536    pub finish_reason: Option<String>,
537}
538
539// ---------------------------------------------------------------------------
540// Public runner
541// ---------------------------------------------------------------------------
542
543/// GitHub Copilot Headless (ACP) LLM provider.
544///
545/// Communicates with `copilot --acp` via the Agent Client Protocol (JSON-RPC over stdio).
546/// Spawns a new copilot subprocess per request using NDJSON framing.
547/// Uses types from `agent-client-protocol-schema` for protocol message deserialization.
548///
549/// Copilot manages its own tool execution loop internally (`SDK_TOOL_CALLING`).
550/// Tool calls are observed and reported via [`HeadlessToolResponse`] from
551/// [`converse()`](Self::converse), but the caller does not need to execute tools.
552pub struct CopilotHeadlessRunner {
553    config: CopilotHeadlessConfig,
554    available_models: Vec<String>,
555}
556
557impl CopilotHeadlessRunner {
558    /// Create a new provider from environment configuration.
559    ///
560    /// Attempts to discover available models via `gh copilot models`.
561    /// Falls back to a static list if discovery fails.
562    pub async fn from_env() -> Self {
563        let available_models = discover_copilot_models()
564            .await
565            .unwrap_or_else(copilot_fallback_models);
566        Self {
567            config: CopilotHeadlessConfig::from_env(),
568            available_models,
569        }
570    }
571
572    /// Create a new provider with explicit configuration.
573    pub async fn with_config(config: CopilotHeadlessConfig) -> Self {
574        let available_models = discover_copilot_models()
575            .await
576            .unwrap_or_else(copilot_fallback_models);
577        Self {
578            config,
579            available_models,
580        }
581    }
582
583    /// Resolve the copilot CLI binary path.
584    fn resolve_cli_path(&self) -> Result<PathBuf, RunnerError> {
585        if let Some(ref path) = self.config.cli_path {
586            return Ok(path.clone());
587        }
588        which::which("copilot").map_err(|_| RunnerError::binary_not_found("copilot"))
589    }
590
591    /// Build ACP prompt content blocks from the last user message.
592    ///
593    /// Always includes a text block. When the last user message has images,
594    /// appends image blocks with `type: "image"`, `data`, and `mimeType`.
595    fn build_prompt_blocks(request: &ChatRequest) -> Vec<Value> {
596        let last_user = request
597            .messages
598            .iter()
599            .rev()
600            .find(|m| m.role == MessageRole::User);
601
602        let text = last_user.map(|m| m.content.as_str()).unwrap_or_default();
603
604        let mut blocks = vec![json!({"type": "text", "text": text})];
605
606        if let Some(images) = last_user.and_then(|m| m.images.as_ref()) {
607            for img in images {
608                blocks.push(json!({
609                    "type": "image",
610                    "data": img.data,
611                    "mimeType": img.mime_type,
612                }));
613            }
614        }
615
616        blocks
617    }
618
619    /// Extract the system prompt if present.
620    fn extract_system_prompt(request: &ChatRequest) -> Option<&str> {
621        request
622            .messages
623            .iter()
624            .find(|m| m.role == MessageRole::System)
625            .map(|m| m.content.as_str())
626    }
627
628    /// Run a conversation turn and return detailed results including tool call metadata.
629    ///
630    /// Unlike [`complete()`](LlmProvider::complete), this returns an [`HeadlessToolResponse`]
631    /// with observed tool calls that copilot executed internally during the turn.
632    pub async fn converse(
633        &self,
634        request: &ChatRequest,
635    ) -> Result<HeadlessToolResponse, RunnerError> {
636        let cli_path = self.resolve_cli_path()?;
637        let model = request
638            .model
639            .as_deref()
640            .unwrap_or(&self.config.model)
641            .to_owned();
642        let system_prompt = Self::extract_system_prompt(request);
643        let prompt_blocks = Self::build_prompt_blocks(request);
644
645        let (mut transport, mut child, session_id) = setup_session(
646            &cli_path,
647            self.config.github_token.as_deref(),
648            &model,
649            system_prompt,
650        )
651        .await?;
652
653        let prompt_id = transport
654            .send_request(
655                "session/prompt",
656                json!({
657                    "sessionId": session_id,
658                    "prompt": prompt_blocks,
659                }),
660            )
661            .await?;
662
663        let result = tokio::time::timeout(
664            ACP_PROMPT_TIMEOUT,
665            collect_complete(
666                &mut transport,
667                prompt_id,
668                model,
669                self.config.permission_policy,
670            ),
671        )
672        .await
673        .map_err(|_| {
674            RunnerError::timeout(format!(
675                "copilot-acp: prompt timed out after {}s",
676                ACP_PROMPT_TIMEOUT.as_secs()
677            ))
678        })?;
679        let _ = child.kill().await;
680
681        let (response, tool_calls) = result?;
682        Ok(HeadlessToolResponse {
683            content: response.content,
684            model: response.model,
685            tool_calls,
686            usage: response.usage,
687            finish_reason: response.finish_reason,
688        })
689    }
690}
691
692#[async_trait]
693impl LlmProvider for CopilotHeadlessRunner {
694    fn name(&self) -> &'static str {
695        "copilot_headless"
696    }
697
698    fn display_name(&self) -> &str {
699        "GitHub Copilot (Headless)"
700    }
701
702    fn capabilities(&self) -> LlmCapabilities {
703        LlmCapabilities::STREAMING
704            | LlmCapabilities::SYSTEM_MESSAGES
705            | LlmCapabilities::SDK_TOOL_CALLING
706            | LlmCapabilities::VISION
707    }
708
709    fn default_model(&self) -> &str {
710        &self.config.model
711    }
712
713    fn available_models(&self) -> &[String] {
714        &self.available_models
715    }
716
717    async fn complete(&self, request: &ChatRequest) -> Result<ChatResponse, RunnerError> {
718        let cli_path = self.resolve_cli_path()?;
719        let model = request
720            .model
721            .as_deref()
722            .unwrap_or(&self.config.model)
723            .to_owned();
724        let system_prompt = Self::extract_system_prompt(request);
725        let prompt_blocks = Self::build_prompt_blocks(request);
726
727        let (mut transport, mut child, session_id) = setup_session(
728            &cli_path,
729            self.config.github_token.as_deref(),
730            &model,
731            system_prompt,
732        )
733        .await?;
734
735        let prompt_id = transport
736            .send_request(
737                "session/prompt",
738                json!({
739                    "sessionId": session_id,
740                    "prompt": prompt_blocks,
741                }),
742            )
743            .await?;
744
745        let result = tokio::time::timeout(
746            ACP_PROMPT_TIMEOUT,
747            collect_complete(
748                &mut transport,
749                prompt_id,
750                model,
751                self.config.permission_policy,
752            ),
753        )
754        .await
755        .map_err(|_| {
756            RunnerError::timeout(format!(
757                "copilot-acp: prompt timed out after {}s",
758                ACP_PROMPT_TIMEOUT.as_secs()
759            ))
760        })?;
761        let _ = child.kill().await;
762        result.map(|(response, _tool_calls)| response)
763    }
764
765    async fn complete_stream(&self, request: &ChatRequest) -> Result<ChatStream, RunnerError> {
766        let cli_path = self.resolve_cli_path()?;
767        let model = request.model.as_deref().unwrap_or(&self.config.model);
768        let system_prompt = Self::extract_system_prompt(request).map(str::to_owned);
769        let prompt_blocks = Self::build_prompt_blocks(request);
770
771        let (mut transport, mut child, session_id) = setup_session(
772            &cli_path,
773            self.config.github_token.as_deref(),
774            model,
775            system_prompt.as_deref(),
776        )
777        .await?;
778
779        let prompt_id = transport
780            .send_request(
781                "session/prompt",
782                json!({
783                    "sessionId": session_id,
784                    "prompt": prompt_blocks,
785                }),
786            )
787            .await?;
788
789        let (chunk_tx, chunk_rx) = mpsc::unbounded_channel();
790        let policy = self.config.permission_policy;
791
792        tokio::spawn(async move {
793            let result = tokio::time::timeout(
794                ACP_PROMPT_TIMEOUT,
795                collect_streaming(&mut transport, prompt_id, &chunk_tx, policy),
796            )
797            .await;
798            match result {
799                Ok(Err(e)) => {
800                    let _ = chunk_tx.send(Err(e));
801                }
802                Err(_) => {
803                    let _ = chunk_tx.send(Err(RunnerError::timeout(format!(
804                        "copilot-acp: prompt timed out after {}s",
805                        ACP_PROMPT_TIMEOUT.as_secs()
806                    ))));
807                }
808                Ok(Ok(())) => {}
809            }
810            let _ = child.kill().await;
811        });
812
813        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(chunk_rx);
814        Ok(Box::pin(stream))
815    }
816
817    async fn health_check(&self) -> Result<bool, RunnerError> {
818        self.resolve_cli_path().map_or(Ok(false), |path| {
819            tracing::info!(cli_path = %path.display(), "Copilot Headless health check: binary found");
820            Ok(true)
821        })
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use crate::types::ChatMessage;
829    use serde_json::json;
830
831    /// Build a valid ACP permission request JSON with the given option kinds.
832    ///
833    /// Uses camelCase field names matching the `agent-client-protocol-schema` serde config.
834    /// `PermissionOptionKind` uses `snake_case`: `allow_once`, `allow_always`,
835    /// `reject_once`, `reject_always`.
836    fn make_permission_params(kinds: &[&str]) -> Value {
837        let options: Vec<Value> = kinds
838            .iter()
839            .enumerate()
840            .map(|(i, kind)| {
841                json!({
842                    "optionId": format!("opt_{i}"),
843                    "name": format!("Option {i}"),
844                    "kind": kind
845                })
846            })
847            .collect();
848        json!({
849            "sessionId": "test-session",
850            "toolCall": {
851                "toolCallId": "tc_1"
852            },
853            "options": options
854        })
855    }
856
857    #[test]
858    fn permission_only_reject_options_cancels() {
859        let params = make_permission_params(&["reject_once", "reject_always"]);
860        let result = build_permission_response(&params, PermissionPolicy::AutoApprove);
861        assert_eq!(result["outcome"], "cancelled");
862    }
863
864    #[test]
865    fn permission_prefers_allow_always_over_allow_once() {
866        let params = make_permission_params(&["allow_once", "allow_always", "reject_once"]);
867        let result = build_permission_response(&params, PermissionPolicy::AutoApprove);
868        // AllowAlways is at index 1 → opt_1
869        let selected_id = result["outcome"]["optionId"].as_str().unwrap();
870        assert_eq!(selected_id, "opt_1");
871    }
872
873    #[test]
874    fn permission_selects_allow_once_when_no_allow_always() {
875        let params = make_permission_params(&["allow_once", "reject_once"]);
876        let result = build_permission_response(&params, PermissionPolicy::AutoApprove);
877        let selected_id = result["outcome"]["optionId"].as_str().unwrap();
878        assert_eq!(selected_id, "opt_0");
879    }
880
881    #[test]
882    fn permission_empty_options_cancels() {
883        let params = json!({
884            "sessionId": "test-session",
885            "toolCall": {
886                "toolCallId": "tc_1"
887            },
888            "options": []
889        });
890        let result = build_permission_response(&params, PermissionPolicy::AutoApprove);
891        assert_eq!(result["outcome"], "cancelled");
892    }
893
894    #[test]
895    fn permission_deny_all_policy_always_cancels() {
896        let params = make_permission_params(&["allow_once", "allow_always"]);
897        let result = build_permission_response(&params, PermissionPolicy::DenyAll);
898        assert_eq!(result["outcome"], "cancelled");
899    }
900
901    #[test]
902    fn build_prompt_blocks_text_only() {
903        let request = ChatRequest::new(vec![ChatMessage::user("Hello")]);
904        let blocks = CopilotHeadlessRunner::build_prompt_blocks(&request);
905        assert_eq!(blocks.len(), 1);
906        assert_eq!(blocks[0]["type"], "text");
907        assert_eq!(blocks[0]["text"], "Hello");
908    }
909
910    #[test]
911    fn build_prompt_blocks_with_images() {
912        use crate::types::ImagePart;
913
914        let img = ImagePart::new("aGVsbG8=", "image/png").unwrap();
915        let request = ChatRequest::new(vec![ChatMessage::user_with_images(
916            "Describe this image",
917            vec![img],
918        )]);
919        let blocks = CopilotHeadlessRunner::build_prompt_blocks(&request);
920        assert_eq!(blocks.len(), 2);
921        assert_eq!(blocks[0]["type"], "text");
922        assert_eq!(blocks[0]["text"], "Describe this image");
923        assert_eq!(blocks[1]["type"], "image");
924        assert_eq!(blocks[1]["data"], "aGVsbG8=");
925        assert_eq!(blocks[1]["mimeType"], "image/png");
926    }
927
928    #[test]
929    fn build_prompt_blocks_uses_last_user_message() {
930        let request = ChatRequest::new(vec![
931            ChatMessage::user("first"),
932            ChatMessage::assistant("response"),
933            ChatMessage::user("second"),
934        ]);
935        let blocks = CopilotHeadlessRunner::build_prompt_blocks(&request);
936        assert_eq!(blocks[0]["text"], "second");
937    }
938
939    #[test]
940    fn capabilities_include_vision() {
941        let caps = LlmCapabilities::STREAMING
942            | LlmCapabilities::SYSTEM_MESSAGES
943            | LlmCapabilities::SDK_TOOL_CALLING
944            | LlmCapabilities::VISION;
945        assert!(caps.supports_vision());
946    }
947}