Skip to main content

synaps_cli/runtime/openai/
stream.rs

1//! Streaming path for OpenAI-compatible providers.
2//!
3//! Mirrors `ApiMethods::call_api_stream_inner` but speaks OpenAI chat/completions
4//! and translates back to Anthropic-shaped events for the rest of the runtime.
5
6use super::translate;
7use super::types::{ChatMessage, OaiEvent, ProviderConfig, StreamOptions, ToolCall};
8use super::wire::StreamDecoder;
9use crate::runtime::types::StreamEvent;
10use futures::StreamExt;
11use serde_json::{json, Value};
12use tokio::sync::mpsc;
13
14/// Run a single streaming request against an OpenAI-compatible endpoint.
15///
16/// Returns the final assistant response as an Anthropic-shaped content Value
17/// (`{"content": [..text.., ..tool_use..]}`) so the outer agent loop can keep
18/// using the same handling as the native Anthropic path.
19#[allow(clippy::too_many_arguments)]
20pub(crate) async fn call_oai_stream_inner(
21    cfg: &ProviderConfig,
22    client: &reqwest::Client,
23    tools_schema: &[Value],
24    system_prompt: &Option<String>,
25    messages: &[Value],
26    tx: &mpsc::UnboundedSender<StreamEvent>,
27    temperature: Option<f32>,
28    max_tokens: Option<u32>,
29    thinking_budget: u32,
30    cancel: &tokio_util::sync::CancellationToken,
31) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
32    let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
33    let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
34    let tools_opt = if oai_tools.is_empty() { None } else { Some(oai_tools) };
35
36    // Google's OpenAI-compat endpoint rejects stream_options
37    let stream_options = if cfg.base_url.contains("googleapis.com") {
38        None
39    } else {
40        Some(StreamOptions { include_usage: true })
41    };
42
43    let mut body = serde_json::Map::new();
44    body.insert("model".to_string(), json!(cfg.model.clone()));
45    body.insert("messages".to_string(), serde_json::to_value(oai_messages)?);
46    body.insert("stream".to_string(), json!(true));
47    if let Some(stream_options) = stream_options {
48        body.insert("stream_options".to_string(), serde_json::to_value(stream_options)?);
49    }
50    if let Some(max_tokens) = max_tokens {
51        body.insert("max_tokens".to_string(), json!(max_tokens));
52    }
53    if let Some(temperature) = temperature {
54        body.insert("temperature".to_string(), json!(temperature));
55    }
56    if let Some(tools) = tools_opt {
57        body.insert("tools".to_string(), serde_json::to_value(tools)?);
58    }
59    super::reasoning::apply_openai_reasoning_params(
60        &mut body,
61        super::reasoning::provider_for_key(&cfg.provider),
62        &cfg.model,
63        thinking_budget,
64    );
65    let body = Value::Object(body);
66
67    let url = format!("{}/chat/completions", cfg.base_url.trim_end_matches('/'));
68
69    tracing::debug!(url=%url, model=%cfg.model, "openai stream request");
70
71    let resp = match client
72        .post(&url)
73        .bearer_auth(&cfg.api_key)
74        .header("content-type", "application/json")
75        .header("accept", "text/event-stream")
76        .json(&body)
77        .send()
78        .await
79    {
80        Ok(r) => r,
81        Err(e) => {
82            if e.is_connect() && url.contains("localhost") {
83                return Err(format!(
84                    "Can't reach local endpoint at {} — is Ollama/LM Studio running?",
85                    url
86                ).into());
87            }
88            return Err(e.into());
89        }
90    };
91
92    if !resp.status().is_success() {
93        let status = resp.status();
94        let text = resp.text().await.unwrap_or_default();
95        return Err(format!("openai request failed: {status}: {text}").into());
96    }
97
98    let mut decoder = StreamDecoder::new();
99    let mut accumulated_text = String::new();
100    let mut tool_use_blocks: Vec<Value> = Vec::new();
101    let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
102    let mut sink: Vec<OaiEvent> = Vec::with_capacity(4);
103    let mut stream = resp.bytes_stream();
104
105    while let Some(chunk) = tokio::select! {
106        chunk = stream.next() => chunk,
107        _ = cancel.cancelled() => {
108            return Err("request canceled".into());
109        }
110    } {
111        let chunk = chunk?;
112        buf.extend_from_slice(&chunk);
113
114        // Scan for newline-delimited SSE lines (SIMD-accelerated via memchr)
115        while let Some(nl) = memchr::memchr(b'\n', &buf) {
116            let line_bytes = buf.split_to(nl + 1); // O(1) — ref-counted split
117            let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
118
119            sink.clear();
120            decoder.push_line(line, &mut sink);
121            handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
122        }
123    }
124
125    // Flush any remaining buffered line + final Done
126    if !buf.is_empty() {
127        let line = std::str::from_utf8(&buf).unwrap_or("");
128        sink.clear();
129        decoder.push_line(line, &mut sink);
130        handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
131    }
132    sink.clear();
133    decoder.finish(&mut sink);
134    handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
135
136    // Build Anthropic-shaped final response
137    let mut content: Vec<Value> = Vec::new();
138    if !accumulated_text.is_empty() {
139        content.push(json!({"type": "text", "text": accumulated_text}));
140    }
141    content.extend(tool_use_blocks);
142
143    Ok(json!({
144        "role": "assistant",
145        "content": content,
146    }))
147}
148
149#[allow(clippy::too_many_arguments)]
150pub(crate) async fn call_codex_stream_inner(
151    cfg: &ProviderConfig,
152    client: &reqwest::Client,
153    tools_schema: &[Value],
154    system_prompt: &Option<String>,
155    messages: &[Value],
156    tx: &mpsc::UnboundedSender<StreamEvent>,
157    temperature: Option<f32>,
158    max_tokens: Option<u32>,
159    cancel: &tokio_util::sync::CancellationToken,
160) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
161    let creds = if cfg.api_key.is_empty() {
162        crate::auth::ensure_fresh_provider_token(client, "openai-codex").await?
163    } else {
164        crate::auth::OAuthCredentials {
165            auth_type: "oauth".to_string(),
166            refresh: String::new(),
167            access: cfg.api_key.clone(),
168            expires: 0,
169            account_id: None,
170        }
171    };
172    let account_id = creds
173        .account_id
174        .clone()
175        .or_else(|| crate::auth::extract_codex_account_id(&creds.access))
176        .ok_or("Failed to extract ChatGPT account id from Codex token")?;
177
178    let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
179    let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
180    let tools: Vec<Value> = oai_tools
181        .into_iter()
182        .map(|tool| {
183            json!({
184                "type": "function",
185                "name": tool.function.name,
186                "description": tool.function.description.unwrap_or_default(),
187                "parameters": tool.function.parameters,
188            })
189        })
190        .collect();
191
192    let mut body = json!({
193        "model": cfg.model,
194        "store": false,
195        "stream": true,
196        "instructions": codex_instructions(system_prompt),
197        "input": codex_input_messages(oai_messages),
198        "tool_choice": "auto",
199        "parallel_tool_calls": true,
200        "include": ["reasoning.encrypted_content"],
201        "text": { "verbosity": "medium" },
202    });
203    if !tools.is_empty() {
204        body["tools"] = Value::Array(tools);
205    }
206    if let Some(temp) = temperature {
207        body["temperature"] = json!(temp);
208    }
209    if let Some(max) = max_tokens {
210        body["max_output_tokens"] = json!(max);
211    }
212
213    let url = format!(
214        "{}/codex/responses",
215        cfg.base_url.trim_end_matches('/').trim_end_matches("/codex")
216    );
217    tracing::debug!(url=%url, model=%cfg.model, "codex stream request");
218
219    let resp = client
220        .post(&url)
221        .bearer_auth(&creds.access)
222        .header("chatgpt-account-id", account_id)
223        .header("originator", "synaps")
224        .header("OpenAI-Beta", "responses=experimental")
225        .header("content-type", "application/json")
226        .header("accept", "text/event-stream")
227        .json(&body)
228        .send()
229        .await?;
230
231    if !resp.status().is_success() {
232        let status = resp.status();
233        let text = resp.text().await.unwrap_or_default();
234        return Err(format!("codex request failed: {status}: {text}").into());
235    }
236
237    let mut accumulated_text = String::new();
238    let mut parser = CodexSseDecoder::default();
239    let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
240    let mut stream = resp.bytes_stream();
241
242    while let Some(chunk) = tokio::select! {
243        chunk = stream.next() => chunk,
244        _ = cancel.cancelled() => {
245            return Err("request canceled".into());
246        }
247    } {
248        let chunk = chunk?;
249        buf.extend_from_slice(&chunk);
250        while let Some(nl) = memchr::memchr(b'\n', &buf) {
251            let line_bytes = buf.split_to(nl + 1);
252            let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
253            parser.push_line(line, tx, &mut accumulated_text);
254        }
255    }
256    if !buf.is_empty() {
257        let line = std::str::from_utf8(&buf).unwrap_or("");
258        parser.push_line(line, tx, &mut accumulated_text);
259    }
260    parser.finish();
261
262    let mut content: Vec<Value> = Vec::new();
263    if !accumulated_text.is_empty() {
264        content.push(json!({"type": "text", "text": accumulated_text}));
265    }
266    content.extend(translate::tool_calls_to_content_blocks(&parser.completed_tools, &name_map));
267
268    Ok(json!({
269        "role": "assistant",
270        "content": content,
271    }))
272}
273
274const CODEX_AUTONOMOUS_LOOP_POLICY: &str = "\n\n[Synaps autonomous harness policy]\nThis harness is non-interactive after the user has provided the task/spec. Do not stop at phase boundaries, milestones, checkpoints, or after presenting a plan unless the full requested job is complete. Do not ask the user whether to continue. When a phase/checkpoint is reached, run any relevant verification and continue autonomously until the full requested job is complete, blocked by an unrecoverable error, or explicit user instructions require stopping.\n[End Synaps autonomous harness policy]";
275
276fn codex_instructions(system_prompt: &Option<String>) -> String {
277    let mut instructions = system_prompt.clone().unwrap_or_default();
278    if instructions.contains("[Synaps autonomous harness policy]") {
279        return instructions;
280    }
281    instructions.push_str(CODEX_AUTONOMOUS_LOOP_POLICY);
282    instructions
283}
284
285fn codex_input_messages(messages: Vec<ChatMessage>) -> Vec<Value> {
286    let mut out = Vec::new();
287    for msg in messages {
288        if let Some(tool_calls) = msg.tool_calls {
289            for call in tool_calls {
290                // The Responses API rejects `id` values that are not the
291                // original `fc_…` output-item id. We only carry the
292                // `call_…` correlation id today (see types::ToolCall),
293                // so emit `id` *only* when the value actually starts
294                // with `fc`. `call_id` is sufficient on its own to
295                // correlate the eventual `function_call_output`.
296                let mut item = json!({
297                    "type": "function_call",
298                    "call_id": call.id,
299                    "name": call.function.name,
300                    "arguments": call.function.arguments,
301                });
302                if call.id.starts_with("fc") {
303                    item["id"] = Value::from(call.id.clone());
304                }
305                out.push(item);
306            }
307            continue;
308        }
309        if msg.role == "tool" {
310            // Skip tool results with no call_id — sending an empty call_id
311            // to the Codex API would cause a 400 with a confusing error.
312            if let Some(call_id) = msg.tool_call_id {
313                out.push(json!({
314                    "type": "function_call_output",
315                    "call_id": call_id,
316                    "output": msg.content.unwrap_or_default(),
317                }));
318            }
319            continue;
320        }
321        out.push(json!({
322            "role": msg.role,
323            "content": msg.content.unwrap_or_default(),
324        }));
325    }
326    out
327}
328
329#[derive(Default)]
330struct CodexSseDecoder {
331    buffer: String,
332    active_tools: Vec<CodexToolAccumulator>,
333    completed_tools: Vec<ToolCall>,
334}
335
336#[derive(Default)]
337struct CodexToolAccumulator {
338    id: String,
339    name: String,
340    arguments: String,
341    started: bool,
342}
343
344/// Parse a function-call arguments string into a JSON `Value`, mirroring
345/// `runtime::api::parse_tool_input` so the chat UI's `LlmEvent::ToolUse`
346/// handling sees the same shape regardless of provider.
347///
348/// Empty / whitespace input becomes `{}`. Invalid JSON becomes
349/// `{"__parse_error": "..."}` — the agent loop already understands that
350/// shape and converts it into an `is_error: true` tool_result.
351fn parse_tool_arguments(raw: &str) -> Value {
352    if raw.trim().is_empty() {
353        return json!({});
354    }
355    match serde_json::from_str(raw) {
356        Ok(v) => v,
357        Err(e) => json!({ "__parse_error": format!("invalid tool input JSON: {}", e) }),
358    }
359}
360
361impl CodexSseDecoder {
362    fn push_line(
363        &mut self,
364        line: &str,
365        tx: &mpsc::UnboundedSender<StreamEvent>,
366        text_acc: &mut String,
367    ) {
368        let line = line.trim_end_matches('\r');
369        if line.is_empty() {
370            if !self.buffer.is_empty() {
371                let payload = std::mem::take(&mut self.buffer);
372                self.push_payload(&payload, tx, text_acc);
373            }
374            return;
375        }
376        let Some(data) = line.strip_prefix("data:").map(str::trim_start) else {
377            return;
378        };
379        if data == "[DONE]" {
380            self.finish();
381            return;
382        }
383        self.buffer.push_str(data);
384    }
385
386    fn push_payload(
387        &mut self,
388        payload: &str,
389        tx: &mpsc::UnboundedSender<StreamEvent>,
390        text_acc: &mut String,
391    ) {
392        let Ok(event) = serde_json::from_str::<Value>(payload) else {
393            return;
394        };
395        let event_type = event.get("type").and_then(Value::as_str).unwrap_or_default();
396        match event_type {
397            "response.output_text.delta" => {
398                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
399                    text_acc.push_str(delta);
400                    let _ = tx.send(StreamEvent::Llm(crate::runtime::types::LlmEvent::Text(
401                        delta.to_string(),
402                    )));
403                }
404            }
405            "response.output_item.added" => {
406                if let Some(item) = event.get("item") {
407                    let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
408                    self.add_tool_from_item(idx, item, tx);
409                }
410            }
411            "response.function_call_arguments.delta" => {
412                let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
413                let delta = event.get("delta").and_then(Value::as_str).unwrap_or_default();
414                if !delta.is_empty() {
415                    let tool = self.ensure_tool(idx);
416                    tool.arguments.push_str(delta);
417                    let tool_id = tool.id.clone();
418                    let _ = tx.send(StreamEvent::Llm(
419                        crate::runtime::types::LlmEvent::ToolUseDelta {
420                            tool_id,
421                            delta: delta.to_string(),
422                        },
423                    ));
424                }
425            }
426            "response.output_item.done" => {
427                if let Some(item) = event.get("item") {
428                    let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
429                    self.complete_tool_from_item(idx, item, tx);
430                }
431            }
432            "response.completed" | "response.done" => {
433                self.push_usage(&event, tx);
434                self.finish();
435            }
436            _ => {}
437        }
438    }
439
440    fn ensure_tool(&mut self, idx: usize) -> &mut CodexToolAccumulator {
441        while self.active_tools.len() <= idx {
442            self.active_tools.push(CodexToolAccumulator::default());
443        }
444        &mut self.active_tools[idx]
445    }
446
447    fn add_tool_from_item(
448        &mut self,
449        idx: usize,
450        item: &Value,
451        tx: &mpsc::UnboundedSender<StreamEvent>,
452    ) {
453        if item.get("type").and_then(Value::as_str) != Some("function_call") {
454            return;
455        }
456        let tool = self.ensure_tool(idx);
457        if let Some(id) = item
458            .get("call_id")
459            .or_else(|| item.get("id"))
460            .and_then(Value::as_str)
461        {
462            tool.id = id.to_string();
463        }
464        if let Some(name) = item.get("name").and_then(Value::as_str) {
465            tool.name = name.to_string();
466        }
467        if !tool.started && !tool.name.is_empty() {
468            tool.started = true;
469            let _ = tx.send(StreamEvent::Llm(
470                crate::runtime::types::LlmEvent::ToolUseStart {
471                    tool_name: tool.name.clone(),
472                    tool_id: tool.id.clone(),
473                },
474            ));
475        }
476    }
477
478    fn complete_tool_from_item(
479        &mut self,
480        idx: usize,
481        item: &Value,
482        tx: &mpsc::UnboundedSender<StreamEvent>,
483    ) {
484        if item.get("type").and_then(Value::as_str) != Some("function_call") {
485            return;
486        }
487        let tool = self.ensure_tool(idx);
488        if let Some(id) = item
489            .get("call_id")
490            .or_else(|| item.get("id"))
491            .and_then(Value::as_str)
492        {
493            tool.id = id.to_string();
494        }
495        if let Some(name) = item.get("name").and_then(Value::as_str) {
496            tool.name = name.to_string();
497        }
498        if let Some(arguments) = item.get("arguments").and_then(Value::as_str) {
499            tool.arguments = arguments.to_string();
500        }
501        if !tool.started && !tool.name.is_empty() {
502            tool.started = true;
503            let _ = tx.send(StreamEvent::Llm(
504                crate::runtime::types::LlmEvent::ToolUseStart {
505                    tool_name: tool.name.clone(),
506                    tool_id: tool.id.clone(),
507                },
508            ));
509        }
510        let completed = if !tool.id.is_empty() && !tool.name.is_empty() {
511            Some(ToolCall {
512                id: tool.id.clone(),
513                kind: "function".to_string(),
514                function: super::types::FunctionCall {
515                    name: tool.name.clone(),
516                    arguments: tool.arguments.clone(),
517                },
518            })
519        } else {
520            None
521        };
522        if let Some(call) = completed {
523            if self.completed_tools.iter().any(|done| done.id == call.id) {
524                return;
525            }
526            // Emit the finalized `ToolUse` event so the chat UI can collapse
527            // the streaming `ToolUseStart` (animated) into a stable
528            // `ToolUse` block. Without this the bash-trace animation
529            // persists forever and parallel tool blocks render as "still
530            // running" even after they've completed. Mirrors the
531            // Anthropic path in `runtime/api.rs` which emits the same
532            // event on tool-use content_block_stop.
533            let input = parse_tool_arguments(&call.function.arguments);
534            let _ = tx.send(StreamEvent::Llm(
535                crate::runtime::types::LlmEvent::ToolUse {
536                    tool_name: call.function.name.clone(),
537                    tool_id: call.id.clone(),
538                    input,
539                },
540            ));
541            self.completed_tools.push(ToolCall {
542                id: call.id,
543                kind: call.kind,
544                function: call.function,
545            });
546        }
547    }
548
549    fn push_usage(&self, event: &Value, tx: &mpsc::UnboundedSender<StreamEvent>) {
550        let usage = event
551            .get("response")
552            .and_then(|r| r.get("usage"))
553            .or_else(|| event.get("usage"));
554        let input = usage
555            .and_then(|u| u.get("input_tokens"))
556            .and_then(Value::as_u64)
557            .unwrap_or(0);
558        let output = usage
559            .and_then(|u| u.get("output_tokens"))
560            .and_then(Value::as_u64)
561            .unwrap_or(0);
562        if input > 0 || output > 0 {
563            let _ = tx.send(StreamEvent::Session(crate::runtime::types::SessionEvent::Usage {
564                input_tokens: input,
565                output_tokens: output,
566                cache_read_input_tokens: 0,
567                cache_creation_input_tokens: 0,
568                model: None,
569            }));
570        }
571    }
572
573    fn finish(&mut self) {
574        for tool in self.active_tools.drain(..) {
575            if !tool.id.is_empty()
576                && !tool.name.is_empty()
577                && !self.completed_tools.iter().any(|done| done.id == tool.id)
578            {
579                self.completed_tools.push(ToolCall {
580                    id: tool.id,
581                    kind: "function".to_string(),
582                    function: super::types::FunctionCall {
583                        name: tool.name,
584                        arguments: tool.arguments,
585                    },
586                });
587            }
588        }
589    }
590}
591
592fn handle_events(
593    events: &[OaiEvent],
594    tx: &mpsc::UnboundedSender<StreamEvent>,
595    text_acc: &mut String,
596    tool_blocks: &mut Vec<Value>,
597    name_map: &translate::ToolNameMap,
598) {
599    for ev in events {
600        if let OaiEvent::TextDelta(t) = ev {
601            text_acc.push_str(t);
602        }
603        if let OaiEvent::ToolCallsComplete { calls, .. } = ev {
604            tool_blocks.extend(translate::tool_calls_to_content_blocks(calls, name_map));
605        }
606        if let Some(se) = translate::oai_event_to_llm(ev) {
607            let _ = tx.send(se);
608        }
609    }
610}
611
612#[cfg(test)]
613mod codex_input_messages_tests {
614    //! Regression tests for the Codex Responses-API `input` shape.
615    //!
616    //! Background: the Responses API distinguishes two ids per tool
617    //! invocation — `id` (the *output item id*, prefix `fc_…`) and
618    //! `call_id` (the *function call id*, prefix `call_…`). When echoing
619    //! a previous `function_call` back as an input item, supplying an
620    //! `id` whose value is *not* a `fc_…` triggers
621    //!
622    //!   400 Bad Request: Invalid 'input[N].id': 'call_…'.
623    //!   Expected an ID that begins with 'fc'.
624    //!
625    //! `id` is *optional* on input items — only `call_id` is required to
626    //! correlate the eventual `function_call_output`. We elect not to
627    //! emit `id` unless we actually have a real `fc_…` value to send.
628
629    use super::*;
630    use super::super::types::{ChatMessage, FunctionCall, ToolCall};
631
632    fn sample_tool_call() -> ToolCall {
633        ToolCall {
634            id: "call_nZYquCuGUh8Qs9H51dwHMDgs".to_string(),
635            kind: "function".to_string(),
636            function: FunctionCall {
637                name: "bash".to_string(),
638                arguments: r#"{"command":"ls"}"#.to_string(),
639            },
640        }
641    }
642
643    #[test]
644    fn codex_instructions_appends_autonomous_loop_policy() {
645        let instructions = codex_instructions(&Some("Project-specific rules.".to_string()));
646        assert!(instructions.contains("Project-specific rules."));
647        assert!(instructions.contains("Do not stop at phase boundaries"));
648        assert!(instructions.contains("Do not ask the user whether to continue"));
649        assert!(instructions.contains("continue autonomously until the full requested job is complete"));
650    }
651
652    #[test]
653    fn function_call_input_omits_non_fc_id() {
654        let messages = vec![ChatMessage::assistant_tool_calls(vec![sample_tool_call()])];
655        let out = codex_input_messages(messages);
656        assert_eq!(out.len(), 1, "one tool_call → one input item");
657        let item = &out[0];
658        assert_eq!(item.get("type").and_then(Value::as_str), Some("function_call"));
659        assert!(
660            item.get("id").is_none(),
661            "must not echo a non-`fc_` id back; got {:?}",
662            item.get("id"),
663        );
664        assert_eq!(
665            item.get("call_id").and_then(Value::as_str),
666            Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
667        );
668        assert_eq!(item.get("name").and_then(Value::as_str), Some("bash"));
669    }
670
671    #[test]
672    fn function_call_input_keeps_real_fc_id() {
673        // If we ever do have a genuine `fc_…` id (round-tripped from the
674        // Responses API), we *should* echo it.
675        let mut call = sample_tool_call();
676        call.id = "fc_abc123".to_string();
677        let messages = vec![ChatMessage::assistant_tool_calls(vec![call])];
678        let out = codex_input_messages(messages);
679        let item = &out[0];
680        assert_eq!(item.get("id").and_then(Value::as_str), Some("fc_abc123"));
681        assert_eq!(item.get("call_id").and_then(Value::as_str), Some("fc_abc123"));
682    }
683
684    #[test]
685    fn function_call_output_round_trips_call_id() {
686        // The follow-up tool message must reference the original call_id.
687        let messages = vec![ChatMessage::tool_result(
688            "call_nZYquCuGUh8Qs9H51dwHMDgs",
689            "bash",
690            "total 0",
691        )];
692        let out = codex_input_messages(messages);
693        let item = &out[0];
694        assert_eq!(
695            item.get("type").and_then(Value::as_str),
696            Some("function_call_output"),
697        );
698        assert_eq!(
699            item.get("call_id").and_then(Value::as_str),
700            Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
701        );
702        assert_eq!(item.get("output").and_then(Value::as_str), Some("total 0"));
703    }
704}
705
706#[cfg(test)]
707mod codex_decoder_tests {
708    //! Regression tests for `CodexSseDecoder`.
709    //!
710    //! The decoder is sync — we drive it via `push_line` and capture
711    //! emitted `StreamEvent`s from an `unbounded_channel` using
712    //! `try_recv`, no async runtime needed.
713
714    use super::*;
715    use crate::runtime::types::{LlmEvent, SessionEvent, StreamEvent};
716
717    fn collect_events(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> Vec<StreamEvent> {
718        let mut out = Vec::new();
719        while let Ok(ev) = rx.try_recv() {
720            out.push(ev);
721        }
722        out
723    }
724
725    fn drive(lines: &[&str]) -> (CodexSseDecoder, String, Vec<StreamEvent>) {
726        let (tx, mut rx) = mpsc::unbounded_channel();
727        let mut decoder = CodexSseDecoder::default();
728        let mut text_acc = String::new();
729        for line in lines {
730            decoder.push_line(line, &tx, &mut text_acc);
731        }
732        let events = collect_events(&mut rx);
733        (decoder, text_acc, events)
734    }
735
736    #[test]
737    fn text_delta_aggregates_into_text_acc_and_emits_text_events() {
738        let lines = [
739            r#"data: {"type":"response.output_text.delta","delta":"Hello, "}"#,
740            "",
741            r#"data: {"type":"response.output_text.delta","delta":"world!"}"#,
742            "",
743        ];
744        let (_decoder, text_acc, events) = drive(&lines);
745        assert_eq!(text_acc, "Hello, world!");
746        let texts: Vec<_> = events
747            .iter()
748            .filter_map(|e| match e {
749                StreamEvent::Llm(LlmEvent::Text(t)) => Some(t.as_str()),
750                _ => None,
751            })
752            .collect();
753        assert_eq!(texts, vec!["Hello, ", "world!"]);
754    }
755
756    #[test]
757    fn single_function_call_completes_via_output_item_done() {
758        let lines = [
759            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
760            "",
761            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\""}"#,
762            "",
763            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":":\"ls\"}"}"#,
764            "",
765            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
766            "",
767        ];
768        let (decoder, _text, events) = drive(&lines);
769
770        assert_eq!(decoder.completed_tools.len(), 1);
771        let tool = &decoder.completed_tools[0];
772        assert_eq!(tool.id, "call_abc");
773        assert_eq!(tool.function.name, "bash");
774        assert_eq!(tool.function.arguments, r#"{"cmd":"ls"}"#);
775
776        // Exactly one ToolUseStart for the tool.
777        let starts: Vec<_> = events
778            .iter()
779            .filter_map(|e| match e {
780                StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
781                    Some((tool_name.as_str(), tool_id.as_str()))
782                }
783                _ => None,
784            })
785            .collect();
786        assert_eq!(
787            starts,
788            vec![("bash", "call_abc")],
789            "exactly one ToolUseStart with correct tool_id"
790        );
791
792        // Two argument deltas streamed (each carrying the tool_id so
793        // parallel calls can be routed correctly by the chat UI).
794        let deltas: Vec<_> = events
795            .iter()
796            .filter_map(|e| match e {
797                StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
798                    Some((tool_id.as_str(), delta.as_str()))
799                }
800                _ => None,
801            })
802            .collect();
803        assert_eq!(
804            deltas,
805            vec![("call_abc", r#"{"cmd""#), ("call_abc", r#":"ls"}"#)]
806        );
807    }
808
809    #[test]
810    fn parallel_tool_calls_indexed_separately() {
811        let lines = [
812            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
813            "",
814            r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
815            "",
816            r#"data: {"type":"response.function_call_arguments.delta","output_index":1,"delta":"{\"path\":\"a\"}"}"#,
817            "",
818            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\":\"ls\"}"}"#,
819            "",
820            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
821            "",
822            r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
823            "",
824        ];
825        let (decoder, _text, _events) = drive(&lines);
826
827        assert_eq!(decoder.completed_tools.len(), 2);
828        let mut by_id: std::collections::BTreeMap<&str, &ToolCall> = std::collections::BTreeMap::new();
829        for tool in &decoder.completed_tools {
830            by_id.insert(tool.id.as_str(), tool);
831        }
832        assert_eq!(by_id["call_1"].function.name, "bash");
833        assert_eq!(by_id["call_1"].function.arguments, r#"{"cmd":"ls"}"#);
834        assert_eq!(by_id["call_2"].function.name, "read");
835        assert_eq!(by_id["call_2"].function.arguments, r#"{"path":"a"}"#);
836    }
837
838    #[test]
839    fn output_item_done_emits_tool_use_event() {
840        // Regression: the codex decoder must emit `LlmEvent::ToolUse` once a
841        // function_call's `output_item.done` arrives so the chat UI can
842        // collapse `ChatMessage::ToolUseStart` (animated) into the finalized
843        // `ChatMessage::ToolUse`. Without this the bash-trace animation
844        // persists forever and parallel tool blocks render as "still
845        // running" even after they've completed.
846        let lines = [
847            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
848            "",
849            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"command\":\"ls\"}"}"#,
850            "",
851            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
852            "",
853        ];
854        let (_decoder, _text, events) = drive(&lines);
855
856        let tool_uses: Vec<_> = events
857            .iter()
858            .filter_map(|e| match e {
859                StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
860                    Some((tool_name.as_str(), tool_id.as_str(), input.clone()))
861                }
862                _ => None,
863            })
864            .collect();
865        assert_eq!(tool_uses.len(), 1, "expected exactly one ToolUse finalize event");
866        assert_eq!(tool_uses[0].0, "bash");
867        assert_eq!(tool_uses[0].1, "call_abc");
868        assert_eq!(
869            tool_uses[0].2,
870            serde_json::json!({"command": "ls"}),
871            "input must be parsed as a JSON Value, not a string"
872        );
873    }
874
875    #[test]
876    fn parallel_tool_calls_emit_tool_use_per_index() {
877        // Regression: parallel tool calls must each get their own ToolUse
878        // finalize event with the correct tool_id, so the chat UI can route
879        // their results back to the right block by id.
880        let lines = [
881            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
882            "",
883            r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
884            "",
885            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
886            "",
887            r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
888            "",
889        ];
890        let (_decoder, _text, events) = drive(&lines);
891
892        let tool_uses: Vec<_> = events
893            .iter()
894            .filter_map(|e| match e {
895                StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
896                    Some((tool_name.clone(), tool_id.clone(), input.clone()))
897                }
898                _ => None,
899            })
900            .collect();
901
902        assert_eq!(tool_uses.len(), 2, "one ToolUse finalize per parallel call");
903        let by_id: std::collections::BTreeMap<&str, &(String, String, serde_json::Value)> =
904            tool_uses.iter().map(|t| (t.1.as_str(), t)).collect();
905        assert_eq!(by_id["call_1"].0, "bash");
906        assert_eq!(by_id["call_1"].2, serde_json::json!({"command": "ls"}));
907        assert_eq!(by_id["call_2"].0, "read");
908        assert_eq!(by_id["call_2"].2, serde_json::json!({"path": "a"}));
909    }
910
911    #[test]
912    fn malformed_arguments_emit_tool_use_with_parse_error() {
913        // If the model produces invalid JSON arguments, surface a structured
914        // parse error in the `input` (matching how the Anthropic path
915        // handles it via parse_tool_input) so the agent loop can return an
916        // error tool_result instead of silently dropping the tool.
917        let lines = [
918            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash"}}"#,
919            "",
920            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash","arguments":"{not json"}}"#,
921            "",
922        ];
923        let (_decoder, _text, events) = drive(&lines);
924
925        let tool_use = events.iter().find_map(|e| match e {
926            StreamEvent::Llm(LlmEvent::ToolUse { input, .. }) => Some(input.clone()),
927            _ => None,
928        });
929        let input = tool_use.expect("ToolUse event missing");
930        assert!(
931            input.get("__parse_error").and_then(Value::as_str).is_some(),
932            "malformed arguments must surface __parse_error, got {input}"
933        );
934    }
935
936    #[test]
937    fn response_completed_emits_usage_event() {
938        let lines = [
939            r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":42,"output_tokens":17}}}"#,
940            "",
941        ];
942        let (_decoder, _text, events) = drive(&lines);
943        let usage = events.iter().find_map(|e| match e {
944            StreamEvent::Session(SessionEvent::Usage {
945                input_tokens,
946                output_tokens,
947                ..
948            }) => Some((*input_tokens, *output_tokens)),
949            _ => None,
950        });
951        assert_eq!(usage, Some((42, 17)));
952    }
953
954    #[test]
955    fn response_completed_with_zero_usage_emits_nothing() {
956        let lines = [
957            r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":0,"output_tokens":0}}}"#,
958            "",
959        ];
960        let (_decoder, _text, events) = drive(&lines);
961        let any_usage = events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })));
962        assert!(!any_usage, "zero-token usage should be suppressed");
963    }
964
965    #[test]
966    fn done_marker_finishes_decoder() {
967        let lines = [
968            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_x","name":"bash"}}"#,
969            "",
970            r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{}"}"#,
971            "",
972            "data: [DONE]",
973            "",
974        ];
975        let (decoder, _text, _events) = drive(&lines);
976        // active_tools promoted to completed_tools by finish().
977        assert_eq!(decoder.completed_tools.len(), 1);
978        assert_eq!(decoder.completed_tools[0].id, "call_x");
979        assert_eq!(decoder.completed_tools[0].function.arguments, "{}");
980    }
981
982    #[test]
983    fn finish_is_idempotent_no_double_emit() {
984        let lines = [
985            r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_y","name":"bash","arguments":"{}"}}"#,
986            "",
987        ];
988        let (mut decoder, _text, _events) = drive(&lines);
989        assert_eq!(decoder.completed_tools.len(), 1);
990
991        // Calling finish() again must not duplicate the tool.
992        decoder.finish();
993        assert_eq!(
994            decoder.completed_tools.len(),
995            1,
996            "finish() called twice must not double-emit"
997        );
998    }
999
1000    #[test]
1001    fn finish_drains_active_tools_for_state_hygiene() {
1002        // After [DONE], any leftover active tool entries should have been
1003        // promoted *and* drained from `active_tools`. This guards against
1004        // future code paths that re-call finish() (or new event types
1005        // that would otherwise re-iterate the old buffer).
1006        let lines = [
1007            r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_z","name":"bash"}}"#,
1008            "",
1009            "data: [DONE]",
1010            "",
1011        ];
1012        let (decoder, _text, _events) = drive(&lines);
1013        assert_eq!(decoder.completed_tools.len(), 1);
1014        assert!(
1015            decoder.active_tools.is_empty(),
1016            "active_tools must be drained after finish()"
1017        );
1018    }
1019
1020    #[test]
1021    fn unknown_event_types_are_ignored() {
1022        let lines = [
1023            r#"data: {"type":"response.future_unknown_event","payload":{"x":1}}"#,
1024            "",
1025            r#"data: {"type":"response.output_text.delta","delta":"hi"}"#,
1026            "",
1027        ];
1028        let (_decoder, text_acc, _events) = drive(&lines);
1029        assert_eq!(text_acc, "hi");
1030    }
1031}