Skip to main content

synaps_cli/runtime/openai/
stream.rs

1//! Streaming path for OpenAI-compatible providers.
2//!
3//! Mirrors `ApiMethods::call_api_stream_inner` but speaks OpenAI chat/completions
4//! and translates back to Anthropic-shaped events for the rest of the runtime.
5
6use super::translate;
7use super::types::{ChatMessage, OaiEvent, ProviderConfig, StreamOptions, ToolCall};
8use super::wire::StreamDecoder;
9use crate::runtime::types::StreamEvent;
10use futures::StreamExt;
11use serde_json::{json, Value};
12use tokio::sync::mpsc;
13
14/// Run a single streaming request against an OpenAI-compatible endpoint.
15///
16/// Returns the final assistant response as an Anthropic-shaped content Value
17/// (`{"content": [..text.., ..tool_use..]}`) so the outer agent loop can keep
18/// using the same handling as the native Anthropic path.
19pub(crate) async fn call_oai_stream_inner(
20    cfg: &ProviderConfig,
21    client: &reqwest::Client,
22    tools_schema: &[Value],
23    system_prompt: &Option<String>,
24    messages: &[Value],
25    tx: &mpsc::UnboundedSender<StreamEvent>,
26    temperature: Option<f32>,
27    max_tokens: Option<u32>,
28    thinking_budget: u32,
29    cancel: &tokio_util::sync::CancellationToken,
30) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
31    let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
32    let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
33    let tools_opt = if oai_tools.is_empty() { None } else { Some(oai_tools) };
34
35    // Google's OpenAI-compat endpoint rejects stream_options
36    let stream_options = if cfg.base_url.contains("googleapis.com") {
37        None
38    } else {
39        Some(StreamOptions { include_usage: true })
40    };
41
42    let mut body = serde_json::Map::new();
43    body.insert("model".to_string(), json!(cfg.model.clone()));
44    body.insert("messages".to_string(), serde_json::to_value(oai_messages)?);
45    body.insert("stream".to_string(), json!(true));
46    if let Some(stream_options) = stream_options {
47        body.insert("stream_options".to_string(), serde_json::to_value(stream_options)?);
48    }
49    if let Some(max_tokens) = max_tokens {
50        body.insert("max_tokens".to_string(), json!(max_tokens));
51    }
52    if let Some(temperature) = temperature {
53        body.insert("temperature".to_string(), json!(temperature));
54    }
55    if let Some(tools) = tools_opt {
56        body.insert("tools".to_string(), serde_json::to_value(tools)?);
57    }
58    super::reasoning::apply_openai_reasoning_params(
59        &mut body,
60        super::reasoning::provider_for_key(&cfg.provider),
61        &cfg.model,
62        thinking_budget,
63    );
64    let body = Value::Object(body);
65
66    let url = format!("{}/chat/completions", cfg.base_url.trim_end_matches('/'));
67
68    tracing::debug!(url=%url, model=%cfg.model, "openai stream request");
69
70    let resp = match client
71        .post(&url)
72        .bearer_auth(&cfg.api_key)
73        .header("content-type", "application/json")
74        .header("accept", "text/event-stream")
75        .json(&body)
76        .send()
77        .await
78    {
79        Ok(r) => r,
80        Err(e) => {
81            if e.is_connect() && url.contains("localhost") {
82                return Err(format!(
83                    "Can't reach local endpoint at {} — is Ollama/LM Studio running?",
84                    url
85                ).into());
86            }
87            return Err(e.into());
88        }
89    };
90
91    if !resp.status().is_success() {
92        let status = resp.status();
93        let text = resp.text().await.unwrap_or_default();
94        return Err(format!("openai request failed: {status}: {text}").into());
95    }
96
97    let mut decoder = StreamDecoder::new();
98    let mut accumulated_text = String::new();
99    let mut tool_use_blocks: Vec<Value> = Vec::new();
100    let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
101    let mut sink: Vec<OaiEvent> = Vec::with_capacity(4);
102    let mut stream = resp.bytes_stream();
103
104    while let Some(chunk) = tokio::select! {
105        chunk = stream.next() => chunk,
106        _ = cancel.cancelled() => {
107            return Err("request canceled".into());
108        }
109    } {
110        let chunk = chunk?;
111        buf.extend_from_slice(&chunk);
112
113        // Scan for newline-delimited SSE lines (SIMD-accelerated via memchr)
114        while let Some(nl) = memchr::memchr(b'\n', &buf) {
115            let line_bytes = buf.split_to(nl + 1); // O(1) — ref-counted split
116            let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
117
118            sink.clear();
119            decoder.push_line(line, &mut sink);
120            handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
121        }
122    }
123
124    // Flush any remaining buffered line + final Done
125    if !buf.is_empty() {
126        let line = std::str::from_utf8(&buf).unwrap_or("");
127        sink.clear();
128        decoder.push_line(line, &mut sink);
129        handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
130    }
131    sink.clear();
132    decoder.finish(&mut sink);
133    handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
134
135    // Build Anthropic-shaped final response
136    let mut content: Vec<Value> = Vec::new();
137    if !accumulated_text.is_empty() {
138        content.push(json!({"type": "text", "text": accumulated_text}));
139    }
140    content.extend(tool_use_blocks);
141
142    Ok(json!({
143        "role": "assistant",
144        "content": content,
145    }))
146}
147
148pub(crate) async fn call_codex_stream_inner(
149    cfg: &ProviderConfig,
150    client: &reqwest::Client,
151    tools_schema: &[Value],
152    system_prompt: &Option<String>,
153    messages: &[Value],
154    tx: &mpsc::UnboundedSender<StreamEvent>,
155    temperature: Option<f32>,
156    max_tokens: Option<u32>,
157    cancel: &tokio_util::sync::CancellationToken,
158) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
159    let creds = if cfg.api_key.is_empty() {
160        crate::auth::ensure_fresh_provider_token(client, "openai-codex").await?
161    } else {
162        crate::auth::OAuthCredentials {
163            auth_type: "oauth".to_string(),
164            refresh: String::new(),
165            access: cfg.api_key.clone(),
166            expires: 0,
167            account_id: None,
168        }
169    };
170    let account_id = creds
171        .account_id
172        .clone()
173        .or_else(|| crate::auth::extract_codex_account_id(&creds.access))
174        .ok_or("Failed to extract ChatGPT account id from Codex token")?;
175
176    let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
177    let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
178    let tools: Vec<Value> = oai_tools
179        .into_iter()
180        .map(|tool| {
181            json!({
182                "type": "function",
183                "name": tool.function.name,
184                "description": tool.function.description.unwrap_or_default(),
185                "parameters": tool.function.parameters,
186            })
187        })
188        .collect();
189
190    let mut body = json!({
191        "model": cfg.model,
192        "store": false,
193        "stream": true,
194        "instructions": codex_instructions(system_prompt),
195        "input": codex_input_messages(oai_messages),
196        "tool_choice": "auto",
197        "parallel_tool_calls": true,
198        "include": ["reasoning.encrypted_content"],
199        "text": { "verbosity": "medium" },
200    });
201    if !tools.is_empty() {
202        body["tools"] = Value::Array(tools);
203    }
204    if let Some(temp) = temperature {
205        body["temperature"] = json!(temp);
206    }
207    if let Some(max) = max_tokens {
208        body["max_output_tokens"] = json!(max);
209    }
210
211    let url = format!(
212        "{}/codex/responses",
213        cfg.base_url.trim_end_matches('/').trim_end_matches("/codex")
214    );
215    tracing::debug!(url=%url, model=%cfg.model, "codex stream request");
216
217    let resp = client
218        .post(&url)
219        .bearer_auth(&creds.access)
220        .header("chatgpt-account-id", account_id)
221        .header("originator", "synaps")
222        .header("OpenAI-Beta", "responses=experimental")
223        .header("content-type", "application/json")
224        .header("accept", "text/event-stream")
225        .json(&body)
226        .send()
227        .await?;
228
229    if !resp.status().is_success() {
230        let status = resp.status();
231        let text = resp.text().await.unwrap_or_default();
232        return Err(format!("codex request failed: {status}: {text}").into());
233    }
234
235    let mut accumulated_text = String::new();
236    let mut parser = CodexSseDecoder::default();
237    let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
238    let mut stream = resp.bytes_stream();
239
240    while let Some(chunk) = tokio::select! {
241        chunk = stream.next() => chunk,
242        _ = cancel.cancelled() => {
243            return Err("request canceled".into());
244        }
245    } {
246        let chunk = chunk?;
247        buf.extend_from_slice(&chunk);
248        while let Some(nl) = memchr::memchr(b'\n', &buf) {
249            let line_bytes = buf.split_to(nl + 1);
250            let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
251            parser.push_line(line, tx, &mut accumulated_text);
252        }
253    }
254    if !buf.is_empty() {
255        let line = std::str::from_utf8(&buf).unwrap_or("");
256        parser.push_line(line, tx, &mut accumulated_text);
257    }
258    parser.finish();
259
260    let mut content: Vec<Value> = Vec::new();
261    if !accumulated_text.is_empty() {
262        content.push(json!({"type": "text", "text": accumulated_text}));
263    }
264    content.extend(translate::tool_calls_to_content_blocks(&parser.completed_tools, &name_map));
265
266    Ok(json!({
267        "role": "assistant",
268        "content": content,
269    }))
270}
271
272const CODEX_AUTONOMOUS_LOOP_POLICY: &str = "\n\n[Synaps autonomous harness policy]\nThis harness is non-interactive after the user has provided the task/spec. Do not stop at phase boundaries, milestones, checkpoints, or after presenting a plan unless the full requested job is complete. Do not ask the user whether to continue. When a phase/checkpoint is reached, run any relevant verification and continue autonomously until the full requested job is complete, blocked by an unrecoverable error, or explicit user instructions require stopping.\n[End Synaps autonomous harness policy]";
273
274fn codex_instructions(system_prompt: &Option<String>) -> String {
275    let mut instructions = system_prompt.clone().unwrap_or_default();
276    if instructions.contains("[Synaps autonomous harness policy]") {
277        return instructions;
278    }
279    instructions.push_str(CODEX_AUTONOMOUS_LOOP_POLICY);
280    instructions
281}
282
283fn codex_input_messages(messages: Vec<ChatMessage>) -> Vec<Value> {
284    let mut out = Vec::new();
285    for msg in messages {
286        if let Some(tool_calls) = msg.tool_calls {
287            for call in tool_calls {
288                // The Responses API rejects `id` values that are not the
289                // original `fc_…` output-item id. We only carry the
290                // `call_…` correlation id today (see types::ToolCall),
291                // so emit `id` *only* when the value actually starts
292                // with `fc`. `call_id` is sufficient on its own to
293                // correlate the eventual `function_call_output`.
294                let mut item = json!({
295                    "type": "function_call",
296                    "call_id": call.id,
297                    "name": call.function.name,
298                    "arguments": call.function.arguments,
299                });
300                if call.id.starts_with("fc") {
301                    item["id"] = Value::from(call.id.clone());
302                }
303                out.push(item);
304            }
305            continue;
306        }
307        if msg.role == "tool" {
308            // Skip tool results with no call_id — sending an empty call_id
309            // to the Codex API would cause a 400 with a confusing error.
310            if let Some(call_id) = msg.tool_call_id {
311                out.push(json!({
312                    "type": "function_call_output",
313                    "call_id": call_id,
314                    "output": msg.content.unwrap_or_default(),
315                }));
316            }
317            continue;
318        }
319        out.push(json!({
320            "role": msg.role,
321            "content": msg.content.unwrap_or_default(),
322        }));
323    }
324    out
325}
326
327#[derive(Default)]
328struct CodexSseDecoder {
329    buffer: String,
330    active_tools: Vec<CodexToolAccumulator>,
331    completed_tools: Vec<ToolCall>,
332}
333
334#[derive(Default)]
335struct CodexToolAccumulator {
336    id: String,
337    name: String,
338    arguments: String,
339    started: bool,
340}
341
342/// Parse a function-call arguments string into a JSON `Value`, mirroring
343/// `runtime::api::parse_tool_input` so the chat UI's `LlmEvent::ToolUse`
344/// handling sees the same shape regardless of provider.
345///
346/// Empty / whitespace input becomes `{}`. Invalid JSON becomes
347/// `{"__parse_error": "..."}` — the agent loop already understands that
348/// shape and converts it into an `is_error: true` tool_result.
349fn parse_tool_arguments(raw: &str) -> Value {
350    if raw.trim().is_empty() {
351        return json!({});
352    }
353    match serde_json::from_str(raw) {
354        Ok(v) => v,
355        Err(e) => json!({ "__parse_error": format!("invalid tool input JSON: {}", e) }),
356    }
357}
358
359impl CodexSseDecoder {
360    fn push_line(
361        &mut self,
362        line: &str,
363        tx: &mpsc::UnboundedSender<StreamEvent>,
364        text_acc: &mut String,
365    ) {
366        let line = line.trim_end_matches('\r');
367        if line.is_empty() {
368            if !self.buffer.is_empty() {
369                let payload = std::mem::take(&mut self.buffer);
370                self.push_payload(&payload, tx, text_acc);
371            }
372            return;
373        }
374        let Some(data) = line.strip_prefix("data:").map(str::trim_start) else {
375            return;
376        };
377        if data == "[DONE]" {
378            self.finish();
379            return;
380        }
381        self.buffer.push_str(data);
382    }
383
384    fn push_payload(
385        &mut self,
386        payload: &str,
387        tx: &mpsc::UnboundedSender<StreamEvent>,
388        text_acc: &mut String,
389    ) {
390        let Ok(event) = serde_json::from_str::<Value>(payload) else {
391            return;
392        };
393        let event_type = event.get("type").and_then(Value::as_str).unwrap_or_default();
394        match event_type {
395            "response.output_text.delta" => {
396                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
397                    text_acc.push_str(delta);
398                    let _ = tx.send(StreamEvent::Llm(crate::runtime::types::LlmEvent::Text(
399                        delta.to_string(),
400                    )));
401                }
402            }
403            "response.output_item.added" => {
404                if let Some(item) = event.get("item") {
405                    let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
406                    self.add_tool_from_item(idx, item, tx);
407                }
408            }
409            "response.function_call_arguments.delta" => {
410                let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
411                let delta = event.get("delta").and_then(Value::as_str).unwrap_or_default();
412                if !delta.is_empty() {
413                    let tool = self.ensure_tool(idx);
414                    tool.arguments.push_str(delta);
415                    let tool_id = tool.id.clone();
416                    let _ = tx.send(StreamEvent::Llm(
417                        crate::runtime::types::LlmEvent::ToolUseDelta {
418                            tool_id,
419                            delta: delta.to_string(),
420                        },
421                    ));
422                }
423            }
424            "response.output_item.done" => {
425                if let Some(item) = event.get("item") {
426                    let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
427                    self.complete_tool_from_item(idx, item, tx);
428                }
429            }
430            "response.completed" | "response.done" => {
431                self.push_usage(&event, tx);
432                self.finish();
433            }
434            _ => {}
435        }
436    }
437
438    fn ensure_tool(&mut self, idx: usize) -> &mut CodexToolAccumulator {
439        while self.active_tools.len() <= idx {
440            self.active_tools.push(CodexToolAccumulator::default());
441        }
442        &mut self.active_tools[idx]
443    }
444
445    fn add_tool_from_item(
446        &mut self,
447        idx: usize,
448        item: &Value,
449        tx: &mpsc::UnboundedSender<StreamEvent>,
450    ) {
451        if item.get("type").and_then(Value::as_str) != Some("function_call") {
452            return;
453        }
454        let tool = self.ensure_tool(idx);
455        if let Some(id) = item
456            .get("call_id")
457            .or_else(|| item.get("id"))
458            .and_then(Value::as_str)
459        {
460            tool.id = id.to_string();
461        }
462        if let Some(name) = item.get("name").and_then(Value::as_str) {
463            tool.name = name.to_string();
464        }
465        if !tool.started && !tool.name.is_empty() {
466            tool.started = true;
467            let _ = tx.send(StreamEvent::Llm(
468                crate::runtime::types::LlmEvent::ToolUseStart {
469                    tool_name: tool.name.clone(),
470                    tool_id: tool.id.clone(),
471                },
472            ));
473        }
474    }
475
476    fn complete_tool_from_item(
477        &mut self,
478        idx: usize,
479        item: &Value,
480        tx: &mpsc::UnboundedSender<StreamEvent>,
481    ) {
482        if item.get("type").and_then(Value::as_str) != Some("function_call") {
483            return;
484        }
485        let tool = self.ensure_tool(idx);
486        if let Some(id) = item
487            .get("call_id")
488            .or_else(|| item.get("id"))
489            .and_then(Value::as_str)
490        {
491            tool.id = id.to_string();
492        }
493        if let Some(name) = item.get("name").and_then(Value::as_str) {
494            tool.name = name.to_string();
495        }
496        if let Some(arguments) = item.get("arguments").and_then(Value::as_str) {
497            tool.arguments = arguments.to_string();
498        }
499        if !tool.started && !tool.name.is_empty() {
500            tool.started = true;
501            let _ = tx.send(StreamEvent::Llm(
502                crate::runtime::types::LlmEvent::ToolUseStart {
503                    tool_name: tool.name.clone(),
504                    tool_id: tool.id.clone(),
505                },
506            ));
507        }
508        let completed = if !tool.id.is_empty() && !tool.name.is_empty() {
509            Some(ToolCall {
510                id: tool.id.clone(),
511                kind: "function".to_string(),
512                function: super::types::FunctionCall {
513                    name: tool.name.clone(),
514                    arguments: tool.arguments.clone(),
515                },
516            })
517        } else {
518            None
519        };
520        if let Some(call) = completed {
521            if self.completed_tools.iter().any(|done| done.id == call.id) {
522                return;
523            }
524            // Emit the finalized `ToolUse` event so the chat UI can collapse
525            // the streaming `ToolUseStart` (animated) into a stable
526            // `ToolUse` block. Without this the bash-trace animation
527            // persists forever and parallel tool blocks render as "still
528            // running" even after they've completed. Mirrors the
529            // Anthropic path in `runtime/api.rs` which emits the same
530            // event on tool-use content_block_stop.
531            let input = parse_tool_arguments(&call.function.arguments);
532            let _ = tx.send(StreamEvent::Llm(
533                crate::runtime::types::LlmEvent::ToolUse {
534                    tool_name: call.function.name.clone(),
535                    tool_id: call.id.clone(),
536                    input,
537                },
538            ));
539            self.completed_tools.push(ToolCall {
540                id: call.id,
541                kind: call.kind,
542                function: call.function,
543            });
544        }
545    }
546
547    fn push_usage(&self, event: &Value, tx: &mpsc::UnboundedSender<StreamEvent>) {
548        let usage = event
549            .get("response")
550            .and_then(|r| r.get("usage"))
551            .or_else(|| event.get("usage"));
552        let input = usage
553            .and_then(|u| u.get("input_tokens"))
554            .and_then(Value::as_u64)
555            .unwrap_or(0);
556        let output = usage
557            .and_then(|u| u.get("output_tokens"))
558            .and_then(Value::as_u64)
559            .unwrap_or(0);
560        if input > 0 || output > 0 {
561            let _ = tx.send(StreamEvent::Session(crate::runtime::types::SessionEvent::Usage {
562                input_tokens: input,
563                output_tokens: output,
564                cache_read_input_tokens: 0,
565                cache_creation_input_tokens: 0,
566                model: None,
567            }));
568        }
569    }
570
571    fn finish(&mut self) {
572        for tool in self.active_tools.drain(..) {
573            if !tool.id.is_empty()
574                && !tool.name.is_empty()
575                && !self.completed_tools.iter().any(|done| done.id == tool.id)
576            {
577                self.completed_tools.push(ToolCall {
578                    id: tool.id,
579                    kind: "function".to_string(),
580                    function: super::types::FunctionCall {
581                        name: tool.name,
582                        arguments: tool.arguments,
583                    },
584                });
585            }
586        }
587    }
588}
589
590fn handle_events(
591    events: &[OaiEvent],
592    tx: &mpsc::UnboundedSender<StreamEvent>,
593    text_acc: &mut String,
594    tool_blocks: &mut Vec<Value>,
595    name_map: &translate::ToolNameMap,
596) {
597    for ev in events {
598        if let OaiEvent::TextDelta(t) = ev {
599            text_acc.push_str(t);
600        }
601        if let OaiEvent::ToolCallsComplete { calls, .. } = ev {
602            tool_blocks.extend(translate::tool_calls_to_content_blocks(calls, name_map));
603        }
604        if let Some(se) = translate::oai_event_to_llm(ev) {
605            let _ = tx.send(se);
606        }
607    }
608}
609
610#[cfg(test)]
611mod codex_input_messages_tests {
612    //! Regression tests for the Codex Responses-API `input` shape.
613    //!
614    //! Background: the Responses API distinguishes two ids per tool
615    //! invocation — `id` (the *output item id*, prefix `fc_…`) and
616    //! `call_id` (the *function call id*, prefix `call_…`). When echoing
617    //! a previous `function_call` back as an input item, supplying an
618    //! `id` whose value is *not* a `fc_…` triggers
619    //!
620    //!   400 Bad Request: Invalid 'input[N].id': 'call_…'.
621    //!   Expected an ID that begins with 'fc'.
622    //!
623    //! `id` is *optional* on input items — only `call_id` is required to
624    //! correlate the eventual `function_call_output`. We elect not to
625    //! emit `id` unless we actually have a real `fc_…` value to send.
626
627    use super::*;
628    use super::super::types::{ChatMessage, FunctionCall, ToolCall};
629
630    fn sample_tool_call() -> ToolCall {
631        ToolCall {
632            id: "call_nZYquCuGUh8Qs9H51dwHMDgs".to_string(),
633            kind: "function".to_string(),
634            function: FunctionCall {
635                name: "bash".to_string(),
636                arguments: r#"{"command":"ls"}"#.to_string(),
637            },
638        }
639    }
640
641    #[test]
642    fn codex_instructions_appends_autonomous_loop_policy() {
643        let instructions = codex_instructions(&Some("Project-specific rules.".to_string()));
644        assert!(instructions.contains("Project-specific rules."));
645        assert!(instructions.contains("Do not stop at phase boundaries"));
646        assert!(instructions.contains("Do not ask the user whether to continue"));
647        assert!(instructions.contains("continue autonomously until the full requested job is complete"));
648    }
649
650    #[test]
651    fn function_call_input_omits_non_fc_id() {
652        let messages = vec![ChatMessage::assistant_tool_calls(vec![sample_tool_call()])];
653        let out = codex_input_messages(messages);
654        assert_eq!(out.len(), 1, "one tool_call → one input item");
655        let item = &out[0];
656        assert_eq!(item.get("type").and_then(Value::as_str), Some("function_call"));
657        assert!(
658            item.get("id").is_none(),
659            "must not echo a non-`fc_` id back; got {:?}",
660            item.get("id"),
661        );
662        assert_eq!(
663            item.get("call_id").and_then(Value::as_str),
664            Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
665        );
666        assert_eq!(item.get("name").and_then(Value::as_str), Some("bash"));
667    }
668
669    #[test]
670    fn function_call_input_keeps_real_fc_id() {
671        // If we ever do have a genuine `fc_…` id (round-tripped from the
672        // Responses API), we *should* echo it.
673        let mut call = sample_tool_call();
674        call.id = "fc_abc123".to_string();
675        let messages = vec![ChatMessage::assistant_tool_calls(vec![call])];
676        let out = codex_input_messages(messages);
677        let item = &out[0];
678        assert_eq!(item.get("id").and_then(Value::as_str), Some("fc_abc123"));
679        assert_eq!(item.get("call_id").and_then(Value::as_str), Some("fc_abc123"));
680    }
681
682    #[test]
683    fn function_call_output_round_trips_call_id() {
684        // The follow-up tool message must reference the original call_id.
685        let messages = vec![ChatMessage::tool_result(
686            "call_nZYquCuGUh8Qs9H51dwHMDgs",
687            "bash",
688            "total 0",
689        )];
690        let out = codex_input_messages(messages);
691        let item = &out[0];
692        assert_eq!(
693            item.get("type").and_then(Value::as_str),
694            Some("function_call_output"),
695        );
696        assert_eq!(
697            item.get("call_id").and_then(Value::as_str),
698            Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
699        );
700        assert_eq!(item.get("output").and_then(Value::as_str), Some("total 0"));
701    }
702}
703
704#[cfg(test)]
705mod codex_decoder_tests {
706    //! Regression tests for `CodexSseDecoder`.
707    //!
708    //! The decoder is sync — we drive it via `push_line` and capture
709    //! emitted `StreamEvent`s from an `unbounded_channel` using
710    //! `try_recv`, no async runtime needed.
711
712    use super::*;
713    use crate::runtime::types::{LlmEvent, SessionEvent, StreamEvent};
714
715    fn collect_events(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> Vec<StreamEvent> {
716        let mut out = Vec::new();
717        while let Ok(ev) = rx.try_recv() {
718            out.push(ev);
719        }
720        out
721    }
722
723    fn drive(lines: &[&str]) -> (CodexSseDecoder, String, Vec<StreamEvent>) {
724        let (tx, mut rx) = mpsc::unbounded_channel();
725        let mut decoder = CodexSseDecoder::default();
726        let mut text_acc = String::new();
727        for line in lines {
728            decoder.push_line(line, &tx, &mut text_acc);
729        }
730        let events = collect_events(&mut rx);
731        (decoder, text_acc, events)
732    }
733
734    #[test]
735    fn text_delta_aggregates_into_text_acc_and_emits_text_events() {
736        let lines = [
737            r#"data: {"type":"response.output_text.delta","delta":"Hello, "}"#,
738            "",
739            r#"data: {"type":"response.output_text.delta","delta":"world!"}"#,
740            "",
741        ];
742        let (_decoder, text_acc, events) = drive(&lines);
743        assert_eq!(text_acc, "Hello, world!");
744        let texts: Vec<_> = events
745            .iter()
746            .filter_map(|e| match e {
747                StreamEvent::Llm(LlmEvent::Text(t)) => Some(t.as_str()),
748                _ => None,
749            })
750            .collect();
751        assert_eq!(texts, vec!["Hello, ", "world!"]);
752    }
753
754    #[test]
755    fn single_function_call_completes_via_output_item_done() {
756        let lines = [
757            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
758            "",
759            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\""}"#,
760            "",
761            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":":\"ls\"}"}"#,
762            "",
763            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
764            "",
765        ];
766        let (decoder, _text, events) = drive(&lines);
767
768        assert_eq!(decoder.completed_tools.len(), 1);
769        let tool = &decoder.completed_tools[0];
770        assert_eq!(tool.id, "call_abc");
771        assert_eq!(tool.function.name, "bash");
772        assert_eq!(tool.function.arguments, r#"{"cmd":"ls"}"#);
773
774        // Exactly one ToolUseStart for the tool.
775        let starts: Vec<_> = events
776            .iter()
777            .filter_map(|e| match e {
778                StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
779                    Some((tool_name.as_str(), tool_id.as_str()))
780                }
781                _ => None,
782            })
783            .collect();
784        assert_eq!(
785            starts,
786            vec![("bash", "call_abc")],
787            "exactly one ToolUseStart with correct tool_id"
788        );
789
790        // Two argument deltas streamed (each carrying the tool_id so
791        // parallel calls can be routed correctly by the chat UI).
792        let deltas: Vec<_> = events
793            .iter()
794            .filter_map(|e| match e {
795                StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
796                    Some((tool_id.as_str(), delta.as_str()))
797                }
798                _ => None,
799            })
800            .collect();
801        assert_eq!(
802            deltas,
803            vec![("call_abc", r#"{"cmd""#), ("call_abc", r#":"ls"}"#)]
804        );
805    }
806
807    #[test]
808    fn parallel_tool_calls_indexed_separately() {
809        let lines = [
810            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
811            "",
812            r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
813            "",
814            r#"data: {"type":"response.function_call_arguments.delta","output_index":1,"delta":"{\"path\":\"a\"}"}"#,
815            "",
816            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\":\"ls\"}"}"#,
817            "",
818            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
819            "",
820            r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
821            "",
822        ];
823        let (decoder, _text, _events) = drive(&lines);
824
825        assert_eq!(decoder.completed_tools.len(), 2);
826        let mut by_id: std::collections::BTreeMap<&str, &ToolCall> = std::collections::BTreeMap::new();
827        for tool in &decoder.completed_tools {
828            by_id.insert(tool.id.as_str(), tool);
829        }
830        assert_eq!(by_id["call_1"].function.name, "bash");
831        assert_eq!(by_id["call_1"].function.arguments, r#"{"cmd":"ls"}"#);
832        assert_eq!(by_id["call_2"].function.name, "read");
833        assert_eq!(by_id["call_2"].function.arguments, r#"{"path":"a"}"#);
834    }
835
836    #[test]
837    fn output_item_done_emits_tool_use_event() {
838        // Regression: the codex decoder must emit `LlmEvent::ToolUse` once a
839        // function_call's `output_item.done` arrives so the chat UI can
840        // collapse `ChatMessage::ToolUseStart` (animated) into the finalized
841        // `ChatMessage::ToolUse`. Without this the bash-trace animation
842        // persists forever and parallel tool blocks render as "still
843        // running" even after they've completed.
844        let lines = [
845            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
846            "",
847            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"command\":\"ls\"}"}"#,
848            "",
849            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
850            "",
851        ];
852        let (_decoder, _text, events) = drive(&lines);
853
854        let tool_uses: Vec<_> = events
855            .iter()
856            .filter_map(|e| match e {
857                StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
858                    Some((tool_name.as_str(), tool_id.as_str(), input.clone()))
859                }
860                _ => None,
861            })
862            .collect();
863        assert_eq!(tool_uses.len(), 1, "expected exactly one ToolUse finalize event");
864        assert_eq!(tool_uses[0].0, "bash");
865        assert_eq!(tool_uses[0].1, "call_abc");
866        assert_eq!(
867            tool_uses[0].2,
868            serde_json::json!({"command": "ls"}),
869            "input must be parsed as a JSON Value, not a string"
870        );
871    }
872
873    #[test]
874    fn parallel_tool_calls_emit_tool_use_per_index() {
875        // Regression: parallel tool calls must each get their own ToolUse
876        // finalize event with the correct tool_id, so the chat UI can route
877        // their results back to the right block by id.
878        let lines = [
879            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
880            "",
881            r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
882            "",
883            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
884            "",
885            r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
886            "",
887        ];
888        let (_decoder, _text, events) = drive(&lines);
889
890        let tool_uses: Vec<_> = events
891            .iter()
892            .filter_map(|e| match e {
893                StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
894                    Some((tool_name.clone(), tool_id.clone(), input.clone()))
895                }
896                _ => None,
897            })
898            .collect();
899
900        assert_eq!(tool_uses.len(), 2, "one ToolUse finalize per parallel call");
901        let by_id: std::collections::BTreeMap<&str, &(String, String, serde_json::Value)> =
902            tool_uses.iter().map(|t| (t.1.as_str(), t)).collect();
903        assert_eq!(by_id["call_1"].0, "bash");
904        assert_eq!(by_id["call_1"].2, serde_json::json!({"command": "ls"}));
905        assert_eq!(by_id["call_2"].0, "read");
906        assert_eq!(by_id["call_2"].2, serde_json::json!({"path": "a"}));
907    }
908
909    #[test]
910    fn malformed_arguments_emit_tool_use_with_parse_error() {
911        // If the model produces invalid JSON arguments, surface a structured
912        // parse error in the `input` (matching how the Anthropic path
913        // handles it via parse_tool_input) so the agent loop can return an
914        // error tool_result instead of silently dropping the tool.
915        let lines = [
916            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash"}}"#,
917            "",
918            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash","arguments":"{not json"}}"#,
919            "",
920        ];
921        let (_decoder, _text, events) = drive(&lines);
922
923        let tool_use = events.iter().find_map(|e| match e {
924            StreamEvent::Llm(LlmEvent::ToolUse { input, .. }) => Some(input.clone()),
925            _ => None,
926        });
927        let input = tool_use.expect("ToolUse event missing");
928        assert!(
929            input.get("__parse_error").and_then(Value::as_str).is_some(),
930            "malformed arguments must surface __parse_error, got {input}"
931        );
932    }
933
934    #[test]
935    fn response_completed_emits_usage_event() {
936        let lines = [
937            r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":42,"output_tokens":17}}}"#,
938            "",
939        ];
940        let (_decoder, _text, events) = drive(&lines);
941        let usage = events.iter().find_map(|e| match e {
942            StreamEvent::Session(SessionEvent::Usage {
943                input_tokens,
944                output_tokens,
945                ..
946            }) => Some((*input_tokens, *output_tokens)),
947            _ => None,
948        });
949        assert_eq!(usage, Some((42, 17)));
950    }
951
952    #[test]
953    fn response_completed_with_zero_usage_emits_nothing() {
954        let lines = [
955            r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":0,"output_tokens":0}}}"#,
956            "",
957        ];
958        let (_decoder, _text, events) = drive(&lines);
959        let any_usage = events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })));
960        assert!(!any_usage, "zero-token usage should be suppressed");
961    }
962
963    #[test]
964    fn done_marker_finishes_decoder() {
965        let lines = [
966            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_x","name":"bash"}}"#,
967            "",
968            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{}"}"#,
969            "",
970            "data: [DONE]",
971            "",
972        ];
973        let (decoder, _text, _events) = drive(&lines);
974        // active_tools promoted to completed_tools by finish().
975        assert_eq!(decoder.completed_tools.len(), 1);
976        assert_eq!(decoder.completed_tools[0].id, "call_x");
977        assert_eq!(decoder.completed_tools[0].function.arguments, "{}");
978    }
979
980    #[test]
981    fn finish_is_idempotent_no_double_emit() {
982        let lines = [
983            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_y","name":"bash","arguments":"{}"}}"#,
984            "",
985        ];
986        let (mut decoder, _text, _events) = drive(&lines);
987        assert_eq!(decoder.completed_tools.len(), 1);
988
989        // Calling finish() again must not duplicate the tool.
990        decoder.finish();
991        assert_eq!(
992            decoder.completed_tools.len(),
993            1,
994            "finish() called twice must not double-emit"
995        );
996    }
997
998    #[test]
999    fn finish_drains_active_tools_for_state_hygiene() {
1000        // After [DONE], any leftover active tool entries should have been
1001        // promoted *and* drained from `active_tools`. This guards against
1002        // future code paths that re-call finish() (or new event types
1003        // that would otherwise re-iterate the old buffer).
1004        let lines = [
1005            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_z","name":"bash"}}"#,
1006            "",
1007            "data: [DONE]",
1008            "",
1009        ];
1010        let (decoder, _text, _events) = drive(&lines);
1011        assert_eq!(decoder.completed_tools.len(), 1);
1012        assert!(
1013            decoder.active_tools.is_empty(),
1014            "active_tools must be drained after finish()"
1015        );
1016    }
1017
1018    #[test]
1019    fn unknown_event_types_are_ignored() {
1020        let lines = [
1021            r#"data: {"type":"response.future_unknown_event","payload":{"x":1}}"#,
1022            "",
1023            r#"data: {"type":"response.output_text.delta","delta":"hi"}"#,
1024            "",
1025        ];
1026        let (_decoder, text_acc, _events) = drive(&lines);
1027        assert_eq!(text_acc, "hi");
1028    }
1029}