Skip to main content

agent_engine/runtime/openai/
stream.rs

1//! Streaming path for OpenAI-compatible providers.
2//!
3//! Mirrors `ApiMethods::call_api_stream_inner` but speaks OpenAI chat/completions
4//! and translates back to Anthropic-shaped events for the rest of the runtime.
5
6use super::translate;
7use super::types::{ChatMessage, OaiEvent, ProviderConfig, StreamOptions, ToolCall};
8use super::wire::StreamDecoder;
9use crate::runtime::types::StreamEvent;
10use futures::StreamExt;
11use serde_json::{json, Value};
12use tokio::sync::mpsc;
13
14/// Run a single streaming request against an OpenAI-compatible endpoint.
15///
16/// Returns the final assistant response as an Anthropic-shaped content Value
17/// (`{"content": [..text.., ..tool_use..]}`) so the outer agent loop can keep
18/// using the same handling as the native Anthropic path.
19#[allow(clippy::too_many_arguments)]
20pub(crate) async fn call_oai_stream_inner(
21    cfg: &ProviderConfig,
22    client: &reqwest::Client,
23    tools_schema: &[Value],
24    system_prompt: &Option<String>,
25    messages: &[Value],
26    tx: &mpsc::UnboundedSender<StreamEvent>,
27    temperature: Option<f32>,
28    max_tokens: Option<u32>,
29    thinking_budget: u32,
30    cancel: &tokio_util::sync::CancellationToken,
31) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
32    let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
33    let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
34    let tools_opt = if oai_tools.is_empty() { None } else { Some(oai_tools) };
35
36    // Google's OpenAI-compat endpoint rejects stream_options
37    let stream_options = if cfg.base_url.contains("googleapis.com") {
38        None
39    } else {
40        Some(StreamOptions { include_usage: true })
41    };
42
43    let mut body = serde_json::Map::new();
44    body.insert("model".to_string(), json!(cfg.model.clone()));
45    body.insert("messages".to_string(), serde_json::to_value(oai_messages)?);
46    body.insert("stream".to_string(), json!(true));
47    if let Some(stream_options) = stream_options {
48        body.insert("stream_options".to_string(), serde_json::to_value(stream_options)?);
49    }
50    if let Some(max_tokens) = max_tokens {
51        body.insert("max_tokens".to_string(), json!(max_tokens));
52    }
53    if let Some(temperature) = temperature {
54        body.insert("temperature".to_string(), json!(temperature));
55    }
56    if let Some(tools) = tools_opt {
57        body.insert("tools".to_string(), serde_json::to_value(tools)?);
58    }
59    super::reasoning::apply_openai_reasoning_params(
60        &mut body,
61        super::reasoning::provider_for_key(&cfg.provider),
62        &cfg.model,
63        thinking_budget,
64    );
65    let body = Value::Object(body);
66
67    let url = format!("{}/chat/completions", cfg.base_url.trim_end_matches('/'));
68
69    tracing::debug!(url=%url, model=%cfg.model, "openai stream request");
70
71    let resp = match client
72        .post(&url)
73        .bearer_auth(&cfg.api_key)
74        .header("content-type", "application/json")
75        .header("accept", "text/event-stream")
76        .json(&body)
77        .send()
78        .await
79    {
80        Ok(r) => r,
81        Err(e) => {
82            if e.is_connect() && url.contains("localhost") {
83                return Err(format!(
84                    "Can't reach local endpoint at {} — is Ollama/LM Studio running?",
85                    url
86                ).into());
87            }
88            return Err(e.into());
89        }
90    };
91
92    if !resp.status().is_success() {
93        let status = resp.status();
94        let text = resp.text().await.unwrap_or_default();
95        return Err(format!("openai request failed: {status}: {text}").into());
96    }
97
98    let mut decoder = StreamDecoder::new();
99    let mut accumulated_text = String::new();
100    let mut tool_use_blocks: Vec<Value> = Vec::new();
101    let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
102    let mut sink: Vec<OaiEvent> = Vec::with_capacity(4);
103    let mut stream = resp.bytes_stream();
104
105    while let Some(chunk) = tokio::select! {
106        chunk = stream.next() => chunk,
107        _ = cancel.cancelled() => {
108            return Err("request canceled".into());
109        }
110    } {
111        let chunk = chunk?;
112        buf.extend_from_slice(&chunk);
113
114        // Scan for newline-delimited SSE lines (SIMD-accelerated via memchr)
115        while let Some(nl) = memchr::memchr(b'\n', &buf) {
116            let line_bytes = buf.split_to(nl + 1); // O(1) — ref-counted split
117            let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
118
119            sink.clear();
120            decoder.push_line(line, &mut sink);
121            handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
122        }
123    }
124
125    // Flush any remaining buffered line + final Done
126    if !buf.is_empty() {
127        let line = std::str::from_utf8(&buf).unwrap_or("");
128        sink.clear();
129        decoder.push_line(line, &mut sink);
130        handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
131    }
132    sink.clear();
133    decoder.finish(&mut sink);
134    handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
135
136    // Build Anthropic-shaped final response
137    let mut content: Vec<Value> = Vec::new();
138    if !accumulated_text.is_empty() {
139        content.push(json!({"type": "text", "text": accumulated_text}));
140    }
141    content.extend(tool_use_blocks);
142
143    Ok(json!({
144        "role": "assistant",
145        "content": content,
146    }))
147}
148
149#[allow(clippy::too_many_arguments)]
150pub(crate) async fn call_codex_stream_inner(
151    cfg: &ProviderConfig,
152    client: &reqwest::Client,
153    tools_schema: &[Value],
154    system_prompt: &Option<String>,
155    messages: &[Value],
156    tx: &mpsc::UnboundedSender<StreamEvent>,
157    temperature: Option<f32>,
158    max_tokens: Option<u32>,
159    cancel: &tokio_util::sync::CancellationToken,
160) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
161    let creds = if cfg.api_key.is_empty() {
162        crate::auth::ensure_fresh_provider_token(client, "openai-codex").await?
163    } else {
164        crate::auth::OAuthCredentials {
165            auth_type: "oauth".to_string(),
166            refresh: String::new(),
167            access: cfg.api_key.clone(),
168            expires: 0,
169            account_id: None,
170        }
171    };
172    let account_id = creds
173        .account_id
174        .clone()
175        .or_else(|| crate::auth::extract_codex_account_id(&creds.access))
176        .ok_or("Failed to extract ChatGPT account id from Codex token")?;
177
178    let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
179    let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
180    let tools: Vec<Value> = oai_tools
181        .into_iter()
182        .map(|tool| {
183            json!({
184                "type": "function",
185                "name": tool.function.name,
186                "description": tool.function.description.unwrap_or_default(),
187                "parameters": tool.function.parameters,
188            })
189        })
190        .collect();
191
192    let mut body = json!({
193        "model": cfg.model,
194        "store": false,
195        "stream": true,
196        "instructions": codex_instructions(system_prompt),
197        "input": codex_input_messages(oai_messages),
198        "tool_choice": "auto",
199        "parallel_tool_calls": true,
200        "include": ["reasoning.encrypted_content"],
201        "text": { "verbosity": "medium" },
202    });
203    if !tools.is_empty() {
204        body["tools"] = Value::Array(tools);
205    }
206    if let Some(temp) = temperature {
207        body["temperature"] = json!(temp);
208    }
209    if let Some(max) = max_tokens {
210        body["max_output_tokens"] = json!(max);
211    }
212
213    let url = format!(
214        "{}/codex/responses",
215        cfg.base_url.trim_end_matches('/').trim_end_matches("/codex")
216    );
217    tracing::debug!(url=%url, model=%cfg.model, "codex stream request");
218
219    let resp = client
220        .post(&url)
221        .bearer_auth(&creds.access)
222        .header("chatgpt-account-id", account_id)
223        .header("originator", "synaps")
224        .header("OpenAI-Beta", "responses=experimental")
225        .header("content-type", "application/json")
226        .header("accept", "text/event-stream")
227        .json(&body)
228        .send()
229        .await?;
230
231    if !resp.status().is_success() {
232        let status = resp.status();
233        let text = resp.text().await.unwrap_or_default();
234        return Err(format!("codex request failed: {status}: {text}").into());
235    }
236
237    let mut accumulated_text = String::new();
238    let mut parser = CodexSseDecoder::default();
239    let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
240    let mut stream = resp.bytes_stream();
241
242    while let Some(chunk) = tokio::select! {
243        chunk = stream.next() => chunk,
244        _ = cancel.cancelled() => {
245            return Err("request canceled".into());
246        }
247    } {
248        let chunk = chunk?;
249        buf.extend_from_slice(&chunk);
250        while let Some(nl) = memchr::memchr(b'\n', &buf) {
251            let line_bytes = buf.split_to(nl + 1);
252            let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
253            parser.push_line(line, tx, &mut accumulated_text);
254        }
255    }
256    if !buf.is_empty() {
257        let line = std::str::from_utf8(&buf).unwrap_or("");
258        parser.push_line(line, tx, &mut accumulated_text);
259    }
260    parser.finish();
261
262    let mut content: Vec<Value> = Vec::new();
263    if !accumulated_text.is_empty() {
264        content.push(json!({"type": "text", "text": accumulated_text}));
265    }
266    content.extend(translate::tool_calls_to_content_blocks(&parser.completed_tools, &name_map));
267
268    Ok(json!({
269        "role": "assistant",
270        "content": content,
271    }))
272}
273
274const CODEX_AUTONOMOUS_LOOP_POLICY: &str = "\n\n[Synaps autonomous harness policy]\nThis harness is non-interactive after the user has provided the task/spec. Do not stop at phase boundaries, milestones, checkpoints, or after presenting a plan unless the full requested job is complete. Do not ask the user whether to continue. When a phase/checkpoint is reached, run any relevant verification and continue autonomously until the full requested job is complete, blocked by an unrecoverable error, or explicit user instructions require stopping.\n[End Synaps autonomous harness policy]";
275
276fn codex_instructions(system_prompt: &Option<String>) -> String {
277    let mut instructions = system_prompt.clone().unwrap_or_default();
278    if instructions.contains("[Synaps autonomous harness policy]") {
279        return instructions;
280    }
281    instructions.push_str(CODEX_AUTONOMOUS_LOOP_POLICY);
282    instructions
283}
284
285fn codex_input_messages(messages: Vec<ChatMessage>) -> Vec<Value> {
286    let mut out = Vec::new();
287    for msg in messages {
288        if let Some(tool_calls) = msg.tool_calls {
289            for call in tool_calls {
290                // The Responses API rejects `id` values that are not the
291                // original `fc_…` output-item id. We only carry the
292                // `call_…` correlation id today (see types::ToolCall),
293                // so emit `id` *only* when the value actually starts
294                // with `fc`. `call_id` is sufficient on its own to
295                // correlate the eventual `function_call_output`.
296                let mut item = json!({
297                    "type": "function_call",
298                    "call_id": call.id,
299                    "name": call.function.name,
300                    "arguments": call.function.arguments,
301                });
302                if call.id.starts_with("fc") {
303                    item["id"] = Value::from(call.id.clone());
304                }
305                out.push(item);
306            }
307            continue;
308        }
309        if msg.role == "tool" {
310            // Skip tool results with no call_id — sending an empty call_id
311            // to the Codex API would cause a 400 with a confusing error.
312            if let Some(call_id) = msg.tool_call_id {
313                out.push(json!({
314                    "type": "function_call_output",
315                    "call_id": call_id,
316                    "output": msg.content.unwrap_or_default(),
317                }));
318            }
319            continue;
320        }
321        out.push(json!({
322            "role": msg.role,
323            "content": msg.content.unwrap_or_default(),
324        }));
325    }
326    out
327}
328
329#[derive(Default)]
330struct CodexSseDecoder {
331    buffer: String,
332    active_tools: Vec<CodexToolAccumulator>,
333    completed_tools: Vec<ToolCall>,
334}
335
336#[derive(Default)]
337struct CodexToolAccumulator {
338    id: String,
339    name: String,
340    arguments: String,
341    started: bool,
342}
343
344/// Parse a function-call arguments string into a JSON `Value`, mirroring
345/// `runtime::api::parse_tool_input` so the chat UI's `LlmEvent::ToolUse`
346/// handling sees the same shape regardless of provider.
347///
348/// Empty / whitespace input becomes `{}`. Invalid JSON becomes
349/// `{"__parse_error": "..."}` — the agent loop already understands that
350/// shape and converts it into an `is_error: true` tool_result.
351fn parse_tool_arguments(raw: &str) -> Value {
352    if raw.trim().is_empty() {
353        return json!({});
354    }
355    match serde_json::from_str(raw) {
356        Ok(v) => v,
357        Err(e) => json!({ "__parse_error": format!("invalid tool input JSON: {}", e) }),
358    }
359}
360
361impl CodexSseDecoder {
362    fn push_line(
363        &mut self,
364        line: &str,
365        tx: &mpsc::UnboundedSender<StreamEvent>,
366        text_acc: &mut String,
367    ) {
368        let line = line.trim_end_matches('\r');
369        if line.is_empty() {
370            if !self.buffer.is_empty() {
371                let payload = std::mem::take(&mut self.buffer);
372                self.push_payload(&payload, tx, text_acc);
373            }
374            return;
375        }
376        let Some(data) = line.strip_prefix("data:").map(str::trim_start) else {
377            return;
378        };
379        if data == "[DONE]" {
380            self.finish();
381            return;
382        }
383        self.buffer.push_str(data);
384    }
385
386    fn push_payload(
387        &mut self,
388        payload: &str,
389        tx: &mpsc::UnboundedSender<StreamEvent>,
390        text_acc: &mut String,
391    ) {
392        let Ok(event) = serde_json::from_str::<Value>(payload) else {
393            return;
394        };
395        let event_type = event.get("type").and_then(Value::as_str).unwrap_or_default();
396        match event_type {
397            "response.output_text.delta" => {
398                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
399                    text_acc.push_str(delta);
400                    let _ = tx.send(StreamEvent::Llm(crate::runtime::types::LlmEvent::Text(
401                        delta.to_string(),
402                    )));
403                }
404            }
405            "response.output_item.added" => {
406                if let Some(item) = event.get("item") {
407                    let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
408                    self.add_tool_from_item(idx, item, tx);
409                }
410            }
411            "response.function_call_arguments.delta" => {
412                let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
413                let delta = event.get("delta").and_then(Value::as_str).unwrap_or_default();
414                if !delta.is_empty() {
415                    let tool = self.ensure_tool(idx);
416                    tool.arguments.push_str(delta);
417                    let tool_id = tool.id.clone();
418                    let _ = tx.send(StreamEvent::Llm(
419                        crate::runtime::types::LlmEvent::ToolUseDelta {
420                            tool_id,
421                            delta: delta.to_string(),
422                        },
423                    ));
424                }
425            }
426            "response.output_item.done" => {
427                if let Some(item) = event.get("item") {
428                    let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
429                    self.complete_tool_from_item(idx, item, tx);
430                }
431            }
432            "response.completed" | "response.done" => {
433                self.push_usage(&event, tx);
434                self.finish();
435            }
436            _ => {}
437        }
438    }
439
440    fn ensure_tool(&mut self, idx: usize) -> &mut CodexToolAccumulator {
441        while self.active_tools.len() <= idx {
442            self.active_tools.push(CodexToolAccumulator::default());
443        }
444        &mut self.active_tools[idx]
445    }
446
447    fn add_tool_from_item(
448        &mut self,
449        idx: usize,
450        item: &Value,
451        tx: &mpsc::UnboundedSender<StreamEvent>,
452    ) {
453        if item.get("type").and_then(Value::as_str) != Some("function_call") {
454            return;
455        }
456        let tool = self.ensure_tool(idx);
457        if let Some(id) = item
458            .get("call_id")
459            .or_else(|| item.get("id"))
460            .and_then(Value::as_str)
461        {
462            tool.id = id.to_string();
463        }
464        if let Some(name) = item.get("name").and_then(Value::as_str) {
465            tool.name = name.to_string();
466        }
467        if !tool.started && !tool.name.is_empty() {
468            tool.started = true;
469            let _ = tx.send(StreamEvent::Llm(
470                crate::runtime::types::LlmEvent::ToolUseStart {
471                    tool_name: tool.name.clone(),
472                    tool_id: tool.id.clone(),
473                },
474            ));
475        }
476    }
477
478    fn complete_tool_from_item(
479        &mut self,
480        idx: usize,
481        item: &Value,
482        tx: &mpsc::UnboundedSender<StreamEvent>,
483    ) {
484        if item.get("type").and_then(Value::as_str) != Some("function_call") {
485            return;
486        }
487        let tool = self.ensure_tool(idx);
488        if let Some(id) = item
489            .get("call_id")
490            .or_else(|| item.get("id"))
491            .and_then(Value::as_str)
492        {
493            tool.id = id.to_string();
494        }
495        if let Some(name) = item.get("name").and_then(Value::as_str) {
496            tool.name = name.to_string();
497        }
498        if let Some(arguments) = item.get("arguments").and_then(Value::as_str) {
499            tool.arguments = arguments.to_string();
500        }
501        if !tool.started && !tool.name.is_empty() {
502            tool.started = true;
503            let _ = tx.send(StreamEvent::Llm(
504                crate::runtime::types::LlmEvent::ToolUseStart {
505                    tool_name: tool.name.clone(),
506                    tool_id: tool.id.clone(),
507                },
508            ));
509        }
510        let completed = if !tool.id.is_empty() && !tool.name.is_empty() {
511            Some(ToolCall {
512                id: tool.id.clone(),
513                kind: "function".to_string(),
514                function: super::types::FunctionCall {
515                    name: tool.name.clone(),
516                    arguments: tool.arguments.clone(),
517                },
518            })
519        } else {
520            None
521        };
522        if let Some(call) = completed {
523            if self.completed_tools.iter().any(|done| done.id == call.id) {
524                return;
525            }
526            // Emit the finalized `ToolUse` event so the chat UI can collapse
527            // the streaming `ToolUseStart` (animated) into a stable
528            // `ToolUse` block. Without this the bash-trace animation
529            // persists forever and parallel tool blocks render as "still
530            // running" even after they've completed. Mirrors the
531            // Anthropic path in `runtime/api.rs` which emits the same
532            // event on tool-use content_block_stop.
533            let input = parse_tool_arguments(&call.function.arguments);
534            let _ = tx.send(StreamEvent::Llm(
535                crate::runtime::types::LlmEvent::ToolUse {
536                    tool_name: call.function.name.clone(),
537                    tool_id: call.id.clone(),
538                    input,
539                },
540            ));
541            self.completed_tools.push(ToolCall {
542                id: call.id,
543                kind: call.kind,
544                function: call.function,
545            });
546        }
547    }
548
549    fn push_usage(&self, event: &Value, tx: &mpsc::UnboundedSender<StreamEvent>) {
550        let usage = event
551            .get("response")
552            .and_then(|r| r.get("usage"))
553            .or_else(|| event.get("usage"));
554        let input = usage
555            .and_then(|u| u.get("input_tokens"))
556            .and_then(Value::as_u64)
557            .unwrap_or(0);
558        let output = usage
559            .and_then(|u| u.get("output_tokens"))
560            .and_then(Value::as_u64)
561            .unwrap_or(0);
562        if input > 0 || output > 0 {
563            let _ = tx.send(StreamEvent::Session(crate::runtime::types::SessionEvent::Usage {
564                input_tokens: input,
565                output_tokens: output,
566                cache_read_input_tokens: 0,
567                cache_creation_input_tokens: 0,
568                cache_creation_5m: None,
569                cache_creation_1h: None,
570                model: None,
571            }));
572        }
573    }
574
575    fn finish(&mut self) {
576        for tool in self.active_tools.drain(..) {
577            if !tool.id.is_empty()
578                && !tool.name.is_empty()
579                && !self.completed_tools.iter().any(|done| done.id == tool.id)
580            {
581                self.completed_tools.push(ToolCall {
582                    id: tool.id,
583                    kind: "function".to_string(),
584                    function: super::types::FunctionCall {
585                        name: tool.name,
586                        arguments: tool.arguments,
587                    },
588                });
589            }
590        }
591    }
592}
593
594fn handle_events(
595    events: &[OaiEvent],
596    tx: &mpsc::UnboundedSender<StreamEvent>,
597    text_acc: &mut String,
598    tool_blocks: &mut Vec<Value>,
599    name_map: &translate::ToolNameMap,
600) {
601    for ev in events {
602        if let OaiEvent::TextDelta(t) = ev {
603            text_acc.push_str(t);
604        }
605        if let OaiEvent::ToolCallsComplete { calls, .. } = ev {
606            tool_blocks.extend(translate::tool_calls_to_content_blocks(calls, name_map));
607        }
608        if let Some(se) = translate::oai_event_to_llm(ev) {
609            let _ = tx.send(se);
610        }
611    }
612}
613
614#[cfg(test)]
615mod codex_input_messages_tests {
616    //! Regression tests for the Codex Responses-API `input` shape.
617    //!
618    //! Background: the Responses API distinguishes two ids per tool
619    //! invocation — `id` (the *output item id*, prefix `fc_…`) and
620    //! `call_id` (the *function call id*, prefix `call_…`). When echoing
621    //! a previous `function_call` back as an input item, supplying an
622    //! `id` whose value is *not* a `fc_…` triggers
623    //!
624    //!   400 Bad Request: Invalid 'input[N].id': 'call_…'.
625    //!   Expected an ID that begins with 'fc'.
626    //!
627    //! `id` is *optional* on input items — only `call_id` is required to
628    //! correlate the eventual `function_call_output`. We elect not to
629    //! emit `id` unless we actually have a real `fc_…` value to send.
630
631    use super::*;
632    use super::super::types::{ChatMessage, FunctionCall, ToolCall};
633
634    fn sample_tool_call() -> ToolCall {
635        ToolCall {
636            id: "call_nZYquCuGUh8Qs9H51dwHMDgs".to_string(),
637            kind: "function".to_string(),
638            function: FunctionCall {
639                name: "bash".to_string(),
640                arguments: r#"{"command":"ls"}"#.to_string(),
641            },
642        }
643    }
644
645    #[test]
646    fn codex_instructions_appends_autonomous_loop_policy() {
647        let instructions = codex_instructions(&Some("Project-specific rules.".to_string()));
648        assert!(instructions.contains("Project-specific rules."));
649        assert!(instructions.contains("Do not stop at phase boundaries"));
650        assert!(instructions.contains("Do not ask the user whether to continue"));
651        assert!(instructions.contains("continue autonomously until the full requested job is complete"));
652    }
653
654    #[test]
655    fn function_call_input_omits_non_fc_id() {
656        let messages = vec![ChatMessage::assistant_tool_calls(vec![sample_tool_call()])];
657        let out = codex_input_messages(messages);
658        assert_eq!(out.len(), 1, "one tool_call → one input item");
659        let item = &out[0];
660        assert_eq!(item.get("type").and_then(Value::as_str), Some("function_call"));
661        assert!(
662            item.get("id").is_none(),
663            "must not echo a non-`fc_` id back; got {:?}",
664            item.get("id"),
665        );
666        assert_eq!(
667            item.get("call_id").and_then(Value::as_str),
668            Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
669        );
670        assert_eq!(item.get("name").and_then(Value::as_str), Some("bash"));
671    }
672
673    #[test]
674    fn function_call_input_keeps_real_fc_id() {
675        // If we ever do have a genuine `fc_…` id (round-tripped from the
676        // Responses API), we *should* echo it.
677        let mut call = sample_tool_call();
678        call.id = "fc_abc123".to_string();
679        let messages = vec![ChatMessage::assistant_tool_calls(vec![call])];
680        let out = codex_input_messages(messages);
681        let item = &out[0];
682        assert_eq!(item.get("id").and_then(Value::as_str), Some("fc_abc123"));
683        assert_eq!(item.get("call_id").and_then(Value::as_str), Some("fc_abc123"));
684    }
685
686    #[test]
687    fn function_call_output_round_trips_call_id() {
688        // The follow-up tool message must reference the original call_id.
689        let messages = vec![ChatMessage::tool_result(
690            "call_nZYquCuGUh8Qs9H51dwHMDgs",
691            "bash",
692            "total 0",
693        )];
694        let out = codex_input_messages(messages);
695        let item = &out[0];
696        assert_eq!(
697            item.get("type").and_then(Value::as_str),
698            Some("function_call_output"),
699        );
700        assert_eq!(
701            item.get("call_id").and_then(Value::as_str),
702            Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
703        );
704        assert_eq!(item.get("output").and_then(Value::as_str), Some("total 0"));
705    }
706}
707
708#[cfg(test)]
709mod codex_decoder_tests {
710    //! Regression tests for `CodexSseDecoder`.
711    //!
712    //! The decoder is sync — we drive it via `push_line` and capture
713    //! emitted `StreamEvent`s from an `unbounded_channel` using
714    //! `try_recv`, no async runtime needed.
715
716    use super::*;
717    use crate::runtime::types::{LlmEvent, SessionEvent, StreamEvent};
718
719    fn collect_events(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> Vec<StreamEvent> {
720        let mut out = Vec::new();
721        while let Ok(ev) = rx.try_recv() {
722            out.push(ev);
723        }
724        out
725    }
726
727    fn drive(lines: &[&str]) -> (CodexSseDecoder, String, Vec<StreamEvent>) {
728        let (tx, mut rx) = mpsc::unbounded_channel();
729        let mut decoder = CodexSseDecoder::default();
730        let mut text_acc = String::new();
731        for line in lines {
732            decoder.push_line(line, &tx, &mut text_acc);
733        }
734        let events = collect_events(&mut rx);
735        (decoder, text_acc, events)
736    }
737
738    #[test]
739    fn text_delta_aggregates_into_text_acc_and_emits_text_events() {
740        let lines = [
741            r#"data: {"type":"response.output_text.delta","delta":"Hello, "}"#,
742            "",
743            r#"data: {"type":"response.output_text.delta","delta":"world!"}"#,
744            "",
745        ];
746        let (_decoder, text_acc, events) = drive(&lines);
747        assert_eq!(text_acc, "Hello, world!");
748        let texts: Vec<_> = events
749            .iter()
750            .filter_map(|e| match e {
751                StreamEvent::Llm(LlmEvent::Text(t)) => Some(t.as_str()),
752                _ => None,
753            })
754            .collect();
755        assert_eq!(texts, vec!["Hello, ", "world!"]);
756    }
757
758    #[test]
759    fn single_function_call_completes_via_output_item_done() {
760        let lines = [
761            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
762            "",
763            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\""}"#,
764            "",
765            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":":\"ls\"}"}"#,
766            "",
767            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
768            "",
769        ];
770        let (decoder, _text, events) = drive(&lines);
771
772        assert_eq!(decoder.completed_tools.len(), 1);
773        let tool = &decoder.completed_tools[0];
774        assert_eq!(tool.id, "call_abc");
775        assert_eq!(tool.function.name, "bash");
776        assert_eq!(tool.function.arguments, r#"{"cmd":"ls"}"#);
777
778        // Exactly one ToolUseStart for the tool.
779        let starts: Vec<_> = events
780            .iter()
781            .filter_map(|e| match e {
782                StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
783                    Some((tool_name.as_str(), tool_id.as_str()))
784                }
785                _ => None,
786            })
787            .collect();
788        assert_eq!(
789            starts,
790            vec![("bash", "call_abc")],
791            "exactly one ToolUseStart with correct tool_id"
792        );
793
794        // Two argument deltas streamed (each carrying the tool_id so
795        // parallel calls can be routed correctly by the chat UI).
796        let deltas: Vec<_> = events
797            .iter()
798            .filter_map(|e| match e {
799                StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
800                    Some((tool_id.as_str(), delta.as_str()))
801                }
802                _ => None,
803            })
804            .collect();
805        assert_eq!(
806            deltas,
807            vec![("call_abc", r#"{"cmd""#), ("call_abc", r#":"ls"}"#)]
808        );
809    }
810
811    #[test]
812    fn parallel_tool_calls_indexed_separately() {
813        let lines = [
814            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
815            "",
816            r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
817            "",
818            r#"data: {"type":"response.function_call_arguments.delta","output_index":1,"delta":"{\"path\":\"a\"}"}"#,
819            "",
820            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\":\"ls\"}"}"#,
821            "",
822            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
823            "",
824            r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
825            "",
826        ];
827        let (decoder, _text, _events) = drive(&lines);
828
829        assert_eq!(decoder.completed_tools.len(), 2);
830        let mut by_id: std::collections::BTreeMap<&str, &ToolCall> = std::collections::BTreeMap::new();
831        for tool in &decoder.completed_tools {
832            by_id.insert(tool.id.as_str(), tool);
833        }
834        assert_eq!(by_id["call_1"].function.name, "bash");
835        assert_eq!(by_id["call_1"].function.arguments, r#"{"cmd":"ls"}"#);
836        assert_eq!(by_id["call_2"].function.name, "read");
837        assert_eq!(by_id["call_2"].function.arguments, r#"{"path":"a"}"#);
838    }
839
840    #[test]
841    fn output_item_done_emits_tool_use_event() {
842        // Regression: the codex decoder must emit `LlmEvent::ToolUse` once a
843        // function_call's `output_item.done` arrives so the chat UI can
844        // collapse `ChatMessage::ToolUseStart` (animated) into the finalized
845        // `ChatMessage::ToolUse`. Without this the bash-trace animation
846        // persists forever and parallel tool blocks render as "still
847        // running" even after they've completed.
848        let lines = [
849            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
850            "",
851            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"command\":\"ls\"}"}"#,
852            "",
853            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
854            "",
855        ];
856        let (_decoder, _text, events) = drive(&lines);
857
858        let tool_uses: Vec<_> = events
859            .iter()
860            .filter_map(|e| match e {
861                StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
862                    Some((tool_name.as_str(), tool_id.as_str(), input.clone()))
863                }
864                _ => None,
865            })
866            .collect();
867        assert_eq!(tool_uses.len(), 1, "expected exactly one ToolUse finalize event");
868        assert_eq!(tool_uses[0].0, "bash");
869        assert_eq!(tool_uses[0].1, "call_abc");
870        assert_eq!(
871            tool_uses[0].2,
872            serde_json::json!({"command": "ls"}),
873            "input must be parsed as a JSON Value, not a string"
874        );
875    }
876
877    #[test]
878    fn parallel_tool_calls_emit_tool_use_per_index() {
879        // Regression: parallel tool calls must each get their own ToolUse
880        // finalize event with the correct tool_id, so the chat UI can route
881        // their results back to the right block by id.
882        let lines = [
883            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
884            "",
885            r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
886            "",
887            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
888            "",
889            r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
890            "",
891        ];
892        let (_decoder, _text, events) = drive(&lines);
893
894        let tool_uses: Vec<_> = events
895            .iter()
896            .filter_map(|e| match e {
897                StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
898                    Some((tool_name.clone(), tool_id.clone(), input.clone()))
899                }
900                _ => None,
901            })
902            .collect();
903
904        assert_eq!(tool_uses.len(), 2, "one ToolUse finalize per parallel call");
905        let by_id: std::collections::BTreeMap<&str, &(String, String, serde_json::Value)> =
906            tool_uses.iter().map(|t| (t.1.as_str(), t)).collect();
907        assert_eq!(by_id["call_1"].0, "bash");
908        assert_eq!(by_id["call_1"].2, serde_json::json!({"command": "ls"}));
909        assert_eq!(by_id["call_2"].0, "read");
910        assert_eq!(by_id["call_2"].2, serde_json::json!({"path": "a"}));
911    }
912
913    #[test]
914    fn malformed_arguments_emit_tool_use_with_parse_error() {
915        // If the model produces invalid JSON arguments, surface a structured
916        // parse error in the `input` (matching how the Anthropic path
917        // handles it via parse_tool_input) so the agent loop can return an
918        // error tool_result instead of silently dropping the tool.
919        let lines = [
920            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash"}}"#,
921            "",
922            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash","arguments":"{not json"}}"#,
923            "",
924        ];
925        let (_decoder, _text, events) = drive(&lines);
926
927        let tool_use = events.iter().find_map(|e| match e {
928            StreamEvent::Llm(LlmEvent::ToolUse { input, .. }) => Some(input.clone()),
929            _ => None,
930        });
931        let input = tool_use.expect("ToolUse event missing");
932        assert!(
933            input.get("__parse_error").and_then(Value::as_str).is_some(),
934            "malformed arguments must surface __parse_error, got {input}"
935        );
936    }
937
938    #[test]
939    fn response_completed_emits_usage_event() {
940        let lines = [
941            r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":42,"output_tokens":17}}}"#,
942            "",
943        ];
944        let (_decoder, _text, events) = drive(&lines);
945        let usage = events.iter().find_map(|e| match e {
946            StreamEvent::Session(SessionEvent::Usage {
947                input_tokens,
948                output_tokens,
949                ..
950            }) => Some((*input_tokens, *output_tokens)),
951            _ => None,
952        });
953        assert_eq!(usage, Some((42, 17)));
954    }
955
956    #[test]
957    fn response_completed_with_zero_usage_emits_nothing() {
958        let lines = [
959            r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":0,"output_tokens":0}}}"#,
960            "",
961        ];
962        let (_decoder, _text, events) = drive(&lines);
963        let any_usage = events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })));
964        assert!(!any_usage, "zero-token usage should be suppressed");
965    }
966
967    #[test]
968    fn done_marker_finishes_decoder() {
969        let lines = [
970            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_x","name":"bash"}}"#,
971            "",
972            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{}"}"#,
973            "",
974            "data: [DONE]",
975            "",
976        ];
977        let (decoder, _text, _events) = drive(&lines);
978        // active_tools promoted to completed_tools by finish().
979        assert_eq!(decoder.completed_tools.len(), 1);
980        assert_eq!(decoder.completed_tools[0].id, "call_x");
981        assert_eq!(decoder.completed_tools[0].function.arguments, "{}");
982    }
983
984    #[test]
985    fn finish_is_idempotent_no_double_emit() {
986        let lines = [
987            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_y","name":"bash","arguments":"{}"}}"#,
988            "",
989        ];
990        let (mut decoder, _text, _events) = drive(&lines);
991        assert_eq!(decoder.completed_tools.len(), 1);
992
993        // Calling finish() again must not duplicate the tool.
994        decoder.finish();
995        assert_eq!(
996            decoder.completed_tools.len(),
997            1,
998            "finish() called twice must not double-emit"
999        );
1000    }
1001
1002    #[test]
1003    fn finish_drains_active_tools_for_state_hygiene() {
1004        // After [DONE], any leftover active tool entries should have been
1005        // promoted *and* drained from `active_tools`. This guards against
1006        // future code paths that re-call finish() (or new event types
1007        // that would otherwise re-iterate the old buffer).
1008        let lines = [
1009            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_z","name":"bash"}}"#,
1010            "",
1011            "data: [DONE]",
1012            "",
1013        ];
1014        let (decoder, _text, _events) = drive(&lines);
1015        assert_eq!(decoder.completed_tools.len(), 1);
1016        assert!(
1017            decoder.active_tools.is_empty(),
1018            "active_tools must be drained after finish()"
1019        );
1020    }
1021
1022    #[test]
1023    fn unknown_event_types_are_ignored() {
1024        let lines = [
1025            r#"data: {"type":"response.future_unknown_event","payload":{"x":1}}"#,
1026            "",
1027            r#"data: {"type":"response.output_text.delta","delta":"hi"}"#,
1028            "",
1029        ];
1030        let (_decoder, text_acc, _events) = drive(&lines);
1031        assert_eq!(text_acc, "hi");
1032    }
1033}