1use super::translate;
7use super::types::{ChatMessage, OaiEvent, ProviderConfig, StreamOptions, ToolCall};
8use super::wire::StreamDecoder;
9use crate::runtime::types::StreamEvent;
10use futures::StreamExt;
11use serde_json::{json, Value};
12use tokio::sync::mpsc;
13
14pub(crate) async fn call_oai_stream_inner(
20 cfg: &ProviderConfig,
21 client: &reqwest::Client,
22 tools_schema: &[Value],
23 system_prompt: &Option<String>,
24 messages: &[Value],
25 tx: &mpsc::UnboundedSender<StreamEvent>,
26 temperature: Option<f32>,
27 max_tokens: Option<u32>,
28 thinking_budget: u32,
29 cancel: &tokio_util::sync::CancellationToken,
30) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
31 let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
32 let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
33 let tools_opt = if oai_tools.is_empty() { None } else { Some(oai_tools) };
34
35 let stream_options = if cfg.base_url.contains("googleapis.com") {
37 None
38 } else {
39 Some(StreamOptions { include_usage: true })
40 };
41
42 let mut body = serde_json::Map::new();
43 body.insert("model".to_string(), json!(cfg.model.clone()));
44 body.insert("messages".to_string(), serde_json::to_value(oai_messages)?);
45 body.insert("stream".to_string(), json!(true));
46 if let Some(stream_options) = stream_options {
47 body.insert("stream_options".to_string(), serde_json::to_value(stream_options)?);
48 }
49 if let Some(max_tokens) = max_tokens {
50 body.insert("max_tokens".to_string(), json!(max_tokens));
51 }
52 if let Some(temperature) = temperature {
53 body.insert("temperature".to_string(), json!(temperature));
54 }
55 if let Some(tools) = tools_opt {
56 body.insert("tools".to_string(), serde_json::to_value(tools)?);
57 }
58 super::reasoning::apply_openai_reasoning_params(
59 &mut body,
60 super::reasoning::provider_for_key(&cfg.provider),
61 &cfg.model,
62 thinking_budget,
63 );
64 let body = Value::Object(body);
65
66 let url = format!("{}/chat/completions", cfg.base_url.trim_end_matches('/'));
67
68 tracing::debug!(url=%url, model=%cfg.model, "openai stream request");
69
70 let resp = match client
71 .post(&url)
72 .bearer_auth(&cfg.api_key)
73 .header("content-type", "application/json")
74 .header("accept", "text/event-stream")
75 .json(&body)
76 .send()
77 .await
78 {
79 Ok(r) => r,
80 Err(e) => {
81 if e.is_connect() && url.contains("localhost") {
82 return Err(format!(
83 "Can't reach local endpoint at {} — is Ollama/LM Studio running?",
84 url
85 ).into());
86 }
87 return Err(e.into());
88 }
89 };
90
91 if !resp.status().is_success() {
92 let status = resp.status();
93 let text = resp.text().await.unwrap_or_default();
94 return Err(format!("openai request failed: {status}: {text}").into());
95 }
96
97 let mut decoder = StreamDecoder::new();
98 let mut accumulated_text = String::new();
99 let mut tool_use_blocks: Vec<Value> = Vec::new();
100 let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
101 let mut sink: Vec<OaiEvent> = Vec::with_capacity(4);
102 let mut stream = resp.bytes_stream();
103
104 while let Some(chunk) = tokio::select! {
105 chunk = stream.next() => chunk,
106 _ = cancel.cancelled() => {
107 return Err("request canceled".into());
108 }
109 } {
110 let chunk = chunk?;
111 buf.extend_from_slice(&chunk);
112
113 while let Some(nl) = memchr::memchr(b'\n', &buf) {
115 let line_bytes = buf.split_to(nl + 1); let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
117
118 sink.clear();
119 decoder.push_line(line, &mut sink);
120 handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
121 }
122 }
123
124 if !buf.is_empty() {
126 let line = std::str::from_utf8(&buf).unwrap_or("");
127 sink.clear();
128 decoder.push_line(line, &mut sink);
129 handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
130 }
131 sink.clear();
132 decoder.finish(&mut sink);
133 handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
134
135 let mut content: Vec<Value> = Vec::new();
137 if !accumulated_text.is_empty() {
138 content.push(json!({"type": "text", "text": accumulated_text}));
139 }
140 content.extend(tool_use_blocks);
141
142 Ok(json!({
143 "role": "assistant",
144 "content": content,
145 }))
146}
147
148pub(crate) async fn call_codex_stream_inner(
149 cfg: &ProviderConfig,
150 client: &reqwest::Client,
151 tools_schema: &[Value],
152 system_prompt: &Option<String>,
153 messages: &[Value],
154 tx: &mpsc::UnboundedSender<StreamEvent>,
155 temperature: Option<f32>,
156 max_tokens: Option<u32>,
157 cancel: &tokio_util::sync::CancellationToken,
158) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
159 let creds = if cfg.api_key.is_empty() {
160 crate::auth::ensure_fresh_provider_token(client, "openai-codex").await?
161 } else {
162 crate::auth::OAuthCredentials {
163 auth_type: "oauth".to_string(),
164 refresh: String::new(),
165 access: cfg.api_key.clone(),
166 expires: 0,
167 account_id: None,
168 }
169 };
170 let account_id = creds
171 .account_id
172 .clone()
173 .or_else(|| crate::auth::extract_codex_account_id(&creds.access))
174 .ok_or("Failed to extract ChatGPT account id from Codex token")?;
175
176 let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
177 let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
178 let tools: Vec<Value> = oai_tools
179 .into_iter()
180 .map(|tool| {
181 json!({
182 "type": "function",
183 "name": tool.function.name,
184 "description": tool.function.description.unwrap_or_default(),
185 "parameters": tool.function.parameters,
186 })
187 })
188 .collect();
189
190 let mut body = json!({
191 "model": cfg.model,
192 "store": false,
193 "stream": true,
194 "instructions": codex_instructions(system_prompt),
195 "input": codex_input_messages(oai_messages),
196 "tool_choice": "auto",
197 "parallel_tool_calls": true,
198 "include": ["reasoning.encrypted_content"],
199 "text": { "verbosity": "medium" },
200 });
201 if !tools.is_empty() {
202 body["tools"] = Value::Array(tools);
203 }
204 if let Some(temp) = temperature {
205 body["temperature"] = json!(temp);
206 }
207 if let Some(max) = max_tokens {
208 body["max_output_tokens"] = json!(max);
209 }
210
211 let url = format!(
212 "{}/codex/responses",
213 cfg.base_url.trim_end_matches('/').trim_end_matches("/codex")
214 );
215 tracing::debug!(url=%url, model=%cfg.model, "codex stream request");
216
217 let resp = client
218 .post(&url)
219 .bearer_auth(&creds.access)
220 .header("chatgpt-account-id", account_id)
221 .header("originator", "synaps")
222 .header("OpenAI-Beta", "responses=experimental")
223 .header("content-type", "application/json")
224 .header("accept", "text/event-stream")
225 .json(&body)
226 .send()
227 .await?;
228
229 if !resp.status().is_success() {
230 let status = resp.status();
231 let text = resp.text().await.unwrap_or_default();
232 return Err(format!("codex request failed: {status}: {text}").into());
233 }
234
235 let mut accumulated_text = String::new();
236 let mut parser = CodexSseDecoder::default();
237 let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
238 let mut stream = resp.bytes_stream();
239
240 while let Some(chunk) = tokio::select! {
241 chunk = stream.next() => chunk,
242 _ = cancel.cancelled() => {
243 return Err("request canceled".into());
244 }
245 } {
246 let chunk = chunk?;
247 buf.extend_from_slice(&chunk);
248 while let Some(nl) = memchr::memchr(b'\n', &buf) {
249 let line_bytes = buf.split_to(nl + 1);
250 let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
251 parser.push_line(line, tx, &mut accumulated_text);
252 }
253 }
254 if !buf.is_empty() {
255 let line = std::str::from_utf8(&buf).unwrap_or("");
256 parser.push_line(line, tx, &mut accumulated_text);
257 }
258 parser.finish();
259
260 let mut content: Vec<Value> = Vec::new();
261 if !accumulated_text.is_empty() {
262 content.push(json!({"type": "text", "text": accumulated_text}));
263 }
264 content.extend(translate::tool_calls_to_content_blocks(&parser.completed_tools, &name_map));
265
266 Ok(json!({
267 "role": "assistant",
268 "content": content,
269 }))
270}
271
272const CODEX_AUTONOMOUS_LOOP_POLICY: &str = "\n\n[Synaps autonomous harness policy]\nThis harness is non-interactive after the user has provided the task/spec. Do not stop at phase boundaries, milestones, checkpoints, or after presenting a plan unless the full requested job is complete. Do not ask the user whether to continue. When a phase/checkpoint is reached, run any relevant verification and continue autonomously until the full requested job is complete, blocked by an unrecoverable error, or explicit user instructions require stopping.\n[End Synaps autonomous harness policy]";
273
274fn codex_instructions(system_prompt: &Option<String>) -> String {
275 let mut instructions = system_prompt.clone().unwrap_or_default();
276 if instructions.contains("[Synaps autonomous harness policy]") {
277 return instructions;
278 }
279 instructions.push_str(CODEX_AUTONOMOUS_LOOP_POLICY);
280 instructions
281}
282
283fn codex_input_messages(messages: Vec<ChatMessage>) -> Vec<Value> {
284 let mut out = Vec::new();
285 for msg in messages {
286 if let Some(tool_calls) = msg.tool_calls {
287 for call in tool_calls {
288 let mut item = json!({
295 "type": "function_call",
296 "call_id": call.id,
297 "name": call.function.name,
298 "arguments": call.function.arguments,
299 });
300 if call.id.starts_with("fc") {
301 item["id"] = Value::from(call.id.clone());
302 }
303 out.push(item);
304 }
305 continue;
306 }
307 if msg.role == "tool" {
308 if let Some(call_id) = msg.tool_call_id {
311 out.push(json!({
312 "type": "function_call_output",
313 "call_id": call_id,
314 "output": msg.content.unwrap_or_default(),
315 }));
316 }
317 continue;
318 }
319 out.push(json!({
320 "role": msg.role,
321 "content": msg.content.unwrap_or_default(),
322 }));
323 }
324 out
325}
326
327#[derive(Default)]
328struct CodexSseDecoder {
329 buffer: String,
330 active_tools: Vec<CodexToolAccumulator>,
331 completed_tools: Vec<ToolCall>,
332}
333
334#[derive(Default)]
335struct CodexToolAccumulator {
336 id: String,
337 name: String,
338 arguments: String,
339 started: bool,
340}
341
342fn parse_tool_arguments(raw: &str) -> Value {
350 if raw.trim().is_empty() {
351 return json!({});
352 }
353 match serde_json::from_str(raw) {
354 Ok(v) => v,
355 Err(e) => json!({ "__parse_error": format!("invalid tool input JSON: {}", e) }),
356 }
357}
358
359impl CodexSseDecoder {
360 fn push_line(
361 &mut self,
362 line: &str,
363 tx: &mpsc::UnboundedSender<StreamEvent>,
364 text_acc: &mut String,
365 ) {
366 let line = line.trim_end_matches('\r');
367 if line.is_empty() {
368 if !self.buffer.is_empty() {
369 let payload = std::mem::take(&mut self.buffer);
370 self.push_payload(&payload, tx, text_acc);
371 }
372 return;
373 }
374 let Some(data) = line.strip_prefix("data:").map(str::trim_start) else {
375 return;
376 };
377 if data == "[DONE]" {
378 self.finish();
379 return;
380 }
381 self.buffer.push_str(data);
382 }
383
384 fn push_payload(
385 &mut self,
386 payload: &str,
387 tx: &mpsc::UnboundedSender<StreamEvent>,
388 text_acc: &mut String,
389 ) {
390 let Ok(event) = serde_json::from_str::<Value>(payload) else {
391 return;
392 };
393 let event_type = event.get("type").and_then(Value::as_str).unwrap_or_default();
394 match event_type {
395 "response.output_text.delta" => {
396 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
397 text_acc.push_str(delta);
398 let _ = tx.send(StreamEvent::Llm(crate::runtime::types::LlmEvent::Text(
399 delta.to_string(),
400 )));
401 }
402 }
403 "response.output_item.added" => {
404 if let Some(item) = event.get("item") {
405 let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
406 self.add_tool_from_item(idx, item, tx);
407 }
408 }
409 "response.function_call_arguments.delta" => {
410 let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
411 let delta = event.get("delta").and_then(Value::as_str).unwrap_or_default();
412 if !delta.is_empty() {
413 let tool = self.ensure_tool(idx);
414 tool.arguments.push_str(delta);
415 let tool_id = tool.id.clone();
416 let _ = tx.send(StreamEvent::Llm(
417 crate::runtime::types::LlmEvent::ToolUseDelta {
418 tool_id,
419 delta: delta.to_string(),
420 },
421 ));
422 }
423 }
424 "response.output_item.done" => {
425 if let Some(item) = event.get("item") {
426 let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
427 self.complete_tool_from_item(idx, item, tx);
428 }
429 }
430 "response.completed" | "response.done" => {
431 self.push_usage(&event, tx);
432 self.finish();
433 }
434 _ => {}
435 }
436 }
437
438 fn ensure_tool(&mut self, idx: usize) -> &mut CodexToolAccumulator {
439 while self.active_tools.len() <= idx {
440 self.active_tools.push(CodexToolAccumulator::default());
441 }
442 &mut self.active_tools[idx]
443 }
444
445 fn add_tool_from_item(
446 &mut self,
447 idx: usize,
448 item: &Value,
449 tx: &mpsc::UnboundedSender<StreamEvent>,
450 ) {
451 if item.get("type").and_then(Value::as_str) != Some("function_call") {
452 return;
453 }
454 let tool = self.ensure_tool(idx);
455 if let Some(id) = item
456 .get("call_id")
457 .or_else(|| item.get("id"))
458 .and_then(Value::as_str)
459 {
460 tool.id = id.to_string();
461 }
462 if let Some(name) = item.get("name").and_then(Value::as_str) {
463 tool.name = name.to_string();
464 }
465 if !tool.started && !tool.name.is_empty() {
466 tool.started = true;
467 let _ = tx.send(StreamEvent::Llm(
468 crate::runtime::types::LlmEvent::ToolUseStart {
469 tool_name: tool.name.clone(),
470 tool_id: tool.id.clone(),
471 },
472 ));
473 }
474 }
475
476 fn complete_tool_from_item(
477 &mut self,
478 idx: usize,
479 item: &Value,
480 tx: &mpsc::UnboundedSender<StreamEvent>,
481 ) {
482 if item.get("type").and_then(Value::as_str) != Some("function_call") {
483 return;
484 }
485 let tool = self.ensure_tool(idx);
486 if let Some(id) = item
487 .get("call_id")
488 .or_else(|| item.get("id"))
489 .and_then(Value::as_str)
490 {
491 tool.id = id.to_string();
492 }
493 if let Some(name) = item.get("name").and_then(Value::as_str) {
494 tool.name = name.to_string();
495 }
496 if let Some(arguments) = item.get("arguments").and_then(Value::as_str) {
497 tool.arguments = arguments.to_string();
498 }
499 if !tool.started && !tool.name.is_empty() {
500 tool.started = true;
501 let _ = tx.send(StreamEvent::Llm(
502 crate::runtime::types::LlmEvent::ToolUseStart {
503 tool_name: tool.name.clone(),
504 tool_id: tool.id.clone(),
505 },
506 ));
507 }
508 let completed = if !tool.id.is_empty() && !tool.name.is_empty() {
509 Some(ToolCall {
510 id: tool.id.clone(),
511 kind: "function".to_string(),
512 function: super::types::FunctionCall {
513 name: tool.name.clone(),
514 arguments: tool.arguments.clone(),
515 },
516 })
517 } else {
518 None
519 };
520 if let Some(call) = completed {
521 if self.completed_tools.iter().any(|done| done.id == call.id) {
522 return;
523 }
524 let input = parse_tool_arguments(&call.function.arguments);
532 let _ = tx.send(StreamEvent::Llm(
533 crate::runtime::types::LlmEvent::ToolUse {
534 tool_name: call.function.name.clone(),
535 tool_id: call.id.clone(),
536 input,
537 },
538 ));
539 self.completed_tools.push(ToolCall {
540 id: call.id,
541 kind: call.kind,
542 function: call.function,
543 });
544 }
545 }
546
547 fn push_usage(&self, event: &Value, tx: &mpsc::UnboundedSender<StreamEvent>) {
548 let usage = event
549 .get("response")
550 .and_then(|r| r.get("usage"))
551 .or_else(|| event.get("usage"));
552 let input = usage
553 .and_then(|u| u.get("input_tokens"))
554 .and_then(Value::as_u64)
555 .unwrap_or(0);
556 let output = usage
557 .and_then(|u| u.get("output_tokens"))
558 .and_then(Value::as_u64)
559 .unwrap_or(0);
560 if input > 0 || output > 0 {
561 let _ = tx.send(StreamEvent::Session(crate::runtime::types::SessionEvent::Usage {
562 input_tokens: input,
563 output_tokens: output,
564 cache_read_input_tokens: 0,
565 cache_creation_input_tokens: 0,
566 model: None,
567 }));
568 }
569 }
570
571 fn finish(&mut self) {
572 for tool in self.active_tools.drain(..) {
573 if !tool.id.is_empty()
574 && !tool.name.is_empty()
575 && !self.completed_tools.iter().any(|done| done.id == tool.id)
576 {
577 self.completed_tools.push(ToolCall {
578 id: tool.id,
579 kind: "function".to_string(),
580 function: super::types::FunctionCall {
581 name: tool.name,
582 arguments: tool.arguments,
583 },
584 });
585 }
586 }
587 }
588}
589
590fn handle_events(
591 events: &[OaiEvent],
592 tx: &mpsc::UnboundedSender<StreamEvent>,
593 text_acc: &mut String,
594 tool_blocks: &mut Vec<Value>,
595 name_map: &translate::ToolNameMap,
596) {
597 for ev in events {
598 if let OaiEvent::TextDelta(t) = ev {
599 text_acc.push_str(t);
600 }
601 if let OaiEvent::ToolCallsComplete { calls, .. } = ev {
602 tool_blocks.extend(translate::tool_calls_to_content_blocks(calls, name_map));
603 }
604 if let Some(se) = translate::oai_event_to_llm(ev) {
605 let _ = tx.send(se);
606 }
607 }
608}
609
610#[cfg(test)]
611mod codex_input_messages_tests {
612 use super::*;
628 use super::super::types::{ChatMessage, FunctionCall, ToolCall};
629
630 fn sample_tool_call() -> ToolCall {
631 ToolCall {
632 id: "call_nZYquCuGUh8Qs9H51dwHMDgs".to_string(),
633 kind: "function".to_string(),
634 function: FunctionCall {
635 name: "bash".to_string(),
636 arguments: r#"{"command":"ls"}"#.to_string(),
637 },
638 }
639 }
640
641 #[test]
642 fn codex_instructions_appends_autonomous_loop_policy() {
643 let instructions = codex_instructions(&Some("Project-specific rules.".to_string()));
644 assert!(instructions.contains("Project-specific rules."));
645 assert!(instructions.contains("Do not stop at phase boundaries"));
646 assert!(instructions.contains("Do not ask the user whether to continue"));
647 assert!(instructions.contains("continue autonomously until the full requested job is complete"));
648 }
649
650 #[test]
651 fn function_call_input_omits_non_fc_id() {
652 let messages = vec![ChatMessage::assistant_tool_calls(vec![sample_tool_call()])];
653 let out = codex_input_messages(messages);
654 assert_eq!(out.len(), 1, "one tool_call → one input item");
655 let item = &out[0];
656 assert_eq!(item.get("type").and_then(Value::as_str), Some("function_call"));
657 assert!(
658 item.get("id").is_none(),
659 "must not echo a non-`fc_` id back; got {:?}",
660 item.get("id"),
661 );
662 assert_eq!(
663 item.get("call_id").and_then(Value::as_str),
664 Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
665 );
666 assert_eq!(item.get("name").and_then(Value::as_str), Some("bash"));
667 }
668
669 #[test]
670 fn function_call_input_keeps_real_fc_id() {
671 let mut call = sample_tool_call();
674 call.id = "fc_abc123".to_string();
675 let messages = vec![ChatMessage::assistant_tool_calls(vec![call])];
676 let out = codex_input_messages(messages);
677 let item = &out[0];
678 assert_eq!(item.get("id").and_then(Value::as_str), Some("fc_abc123"));
679 assert_eq!(item.get("call_id").and_then(Value::as_str), Some("fc_abc123"));
680 }
681
682 #[test]
683 fn function_call_output_round_trips_call_id() {
684 let messages = vec![ChatMessage::tool_result(
686 "call_nZYquCuGUh8Qs9H51dwHMDgs",
687 "bash",
688 "total 0",
689 )];
690 let out = codex_input_messages(messages);
691 let item = &out[0];
692 assert_eq!(
693 item.get("type").and_then(Value::as_str),
694 Some("function_call_output"),
695 );
696 assert_eq!(
697 item.get("call_id").and_then(Value::as_str),
698 Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
699 );
700 assert_eq!(item.get("output").and_then(Value::as_str), Some("total 0"));
701 }
702}
703
704#[cfg(test)]
705mod codex_decoder_tests {
706 use super::*;
713 use crate::runtime::types::{LlmEvent, SessionEvent, StreamEvent};
714
715 fn collect_events(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> Vec<StreamEvent> {
716 let mut out = Vec::new();
717 while let Ok(ev) = rx.try_recv() {
718 out.push(ev);
719 }
720 out
721 }
722
723 fn drive(lines: &[&str]) -> (CodexSseDecoder, String, Vec<StreamEvent>) {
724 let (tx, mut rx) = mpsc::unbounded_channel();
725 let mut decoder = CodexSseDecoder::default();
726 let mut text_acc = String::new();
727 for line in lines {
728 decoder.push_line(line, &tx, &mut text_acc);
729 }
730 let events = collect_events(&mut rx);
731 (decoder, text_acc, events)
732 }
733
734 #[test]
735 fn text_delta_aggregates_into_text_acc_and_emits_text_events() {
736 let lines = [
737 r#"data: {"type":"response.output_text.delta","delta":"Hello, "}"#,
738 "",
739 r#"data: {"type":"response.output_text.delta","delta":"world!"}"#,
740 "",
741 ];
742 let (_decoder, text_acc, events) = drive(&lines);
743 assert_eq!(text_acc, "Hello, world!");
744 let texts: Vec<_> = events
745 .iter()
746 .filter_map(|e| match e {
747 StreamEvent::Llm(LlmEvent::Text(t)) => Some(t.as_str()),
748 _ => None,
749 })
750 .collect();
751 assert_eq!(texts, vec!["Hello, ", "world!"]);
752 }
753
754 #[test]
755 fn single_function_call_completes_via_output_item_done() {
756 let lines = [
757 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
758 "",
759 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\""}"#,
760 "",
761 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":":\"ls\"}"}"#,
762 "",
763 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
764 "",
765 ];
766 let (decoder, _text, events) = drive(&lines);
767
768 assert_eq!(decoder.completed_tools.len(), 1);
769 let tool = &decoder.completed_tools[0];
770 assert_eq!(tool.id, "call_abc");
771 assert_eq!(tool.function.name, "bash");
772 assert_eq!(tool.function.arguments, r#"{"cmd":"ls"}"#);
773
774 let starts: Vec<_> = events
776 .iter()
777 .filter_map(|e| match e {
778 StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
779 Some((tool_name.as_str(), tool_id.as_str()))
780 }
781 _ => None,
782 })
783 .collect();
784 assert_eq!(
785 starts,
786 vec![("bash", "call_abc")],
787 "exactly one ToolUseStart with correct tool_id"
788 );
789
790 let deltas: Vec<_> = events
793 .iter()
794 .filter_map(|e| match e {
795 StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
796 Some((tool_id.as_str(), delta.as_str()))
797 }
798 _ => None,
799 })
800 .collect();
801 assert_eq!(
802 deltas,
803 vec![("call_abc", r#"{"cmd""#), ("call_abc", r#":"ls"}"#)]
804 );
805 }
806
807 #[test]
808 fn parallel_tool_calls_indexed_separately() {
809 let lines = [
810 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
811 "",
812 r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
813 "",
814 r#"data: {"type":"response.function_call_arguments.delta","output_index":1,"delta":"{\"path\":\"a\"}"}"#,
815 "",
816 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\":\"ls\"}"}"#,
817 "",
818 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
819 "",
820 r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
821 "",
822 ];
823 let (decoder, _text, _events) = drive(&lines);
824
825 assert_eq!(decoder.completed_tools.len(), 2);
826 let mut by_id: std::collections::BTreeMap<&str, &ToolCall> = std::collections::BTreeMap::new();
827 for tool in &decoder.completed_tools {
828 by_id.insert(tool.id.as_str(), tool);
829 }
830 assert_eq!(by_id["call_1"].function.name, "bash");
831 assert_eq!(by_id["call_1"].function.arguments, r#"{"cmd":"ls"}"#);
832 assert_eq!(by_id["call_2"].function.name, "read");
833 assert_eq!(by_id["call_2"].function.arguments, r#"{"path":"a"}"#);
834 }
835
836 #[test]
837 fn output_item_done_emits_tool_use_event() {
838 let lines = [
845 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
846 "",
847 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"command\":\"ls\"}"}"#,
848 "",
849 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
850 "",
851 ];
852 let (_decoder, _text, events) = drive(&lines);
853
854 let tool_uses: Vec<_> = events
855 .iter()
856 .filter_map(|e| match e {
857 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
858 Some((tool_name.as_str(), tool_id.as_str(), input.clone()))
859 }
860 _ => None,
861 })
862 .collect();
863 assert_eq!(tool_uses.len(), 1, "expected exactly one ToolUse finalize event");
864 assert_eq!(tool_uses[0].0, "bash");
865 assert_eq!(tool_uses[0].1, "call_abc");
866 assert_eq!(
867 tool_uses[0].2,
868 serde_json::json!({"command": "ls"}),
869 "input must be parsed as a JSON Value, not a string"
870 );
871 }
872
873 #[test]
874 fn parallel_tool_calls_emit_tool_use_per_index() {
875 let lines = [
879 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
880 "",
881 r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
882 "",
883 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
884 "",
885 r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
886 "",
887 ];
888 let (_decoder, _text, events) = drive(&lines);
889
890 let tool_uses: Vec<_> = events
891 .iter()
892 .filter_map(|e| match e {
893 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
894 Some((tool_name.clone(), tool_id.clone(), input.clone()))
895 }
896 _ => None,
897 })
898 .collect();
899
900 assert_eq!(tool_uses.len(), 2, "one ToolUse finalize per parallel call");
901 let by_id: std::collections::BTreeMap<&str, &(String, String, serde_json::Value)> =
902 tool_uses.iter().map(|t| (t.1.as_str(), t)).collect();
903 assert_eq!(by_id["call_1"].0, "bash");
904 assert_eq!(by_id["call_1"].2, serde_json::json!({"command": "ls"}));
905 assert_eq!(by_id["call_2"].0, "read");
906 assert_eq!(by_id["call_2"].2, serde_json::json!({"path": "a"}));
907 }
908
909 #[test]
910 fn malformed_arguments_emit_tool_use_with_parse_error() {
911 let lines = [
916 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash"}}"#,
917 "",
918 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash","arguments":"{not json"}}"#,
919 "",
920 ];
921 let (_decoder, _text, events) = drive(&lines);
922
923 let tool_use = events.iter().find_map(|e| match e {
924 StreamEvent::Llm(LlmEvent::ToolUse { input, .. }) => Some(input.clone()),
925 _ => None,
926 });
927 let input = tool_use.expect("ToolUse event missing");
928 assert!(
929 input.get("__parse_error").and_then(Value::as_str).is_some(),
930 "malformed arguments must surface __parse_error, got {input}"
931 );
932 }
933
934 #[test]
935 fn response_completed_emits_usage_event() {
936 let lines = [
937 r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":42,"output_tokens":17}}}"#,
938 "",
939 ];
940 let (_decoder, _text, events) = drive(&lines);
941 let usage = events.iter().find_map(|e| match e {
942 StreamEvent::Session(SessionEvent::Usage {
943 input_tokens,
944 output_tokens,
945 ..
946 }) => Some((*input_tokens, *output_tokens)),
947 _ => None,
948 });
949 assert_eq!(usage, Some((42, 17)));
950 }
951
952 #[test]
953 fn response_completed_with_zero_usage_emits_nothing() {
954 let lines = [
955 r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":0,"output_tokens":0}}}"#,
956 "",
957 ];
958 let (_decoder, _text, events) = drive(&lines);
959 let any_usage = events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })));
960 assert!(!any_usage, "zero-token usage should be suppressed");
961 }
962
963 #[test]
964 fn done_marker_finishes_decoder() {
965 let lines = [
966 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_x","name":"bash"}}"#,
967 "",
968 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{}"}"#,
969 "",
970 "data: [DONE]",
971 "",
972 ];
973 let (decoder, _text, _events) = drive(&lines);
974 assert_eq!(decoder.completed_tools.len(), 1);
976 assert_eq!(decoder.completed_tools[0].id, "call_x");
977 assert_eq!(decoder.completed_tools[0].function.arguments, "{}");
978 }
979
980 #[test]
981 fn finish_is_idempotent_no_double_emit() {
982 let lines = [
983 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_y","name":"bash","arguments":"{}"}}"#,
984 "",
985 ];
986 let (mut decoder, _text, _events) = drive(&lines);
987 assert_eq!(decoder.completed_tools.len(), 1);
988
989 decoder.finish();
991 assert_eq!(
992 decoder.completed_tools.len(),
993 1,
994 "finish() called twice must not double-emit"
995 );
996 }
997
998 #[test]
999 fn finish_drains_active_tools_for_state_hygiene() {
1000 let lines = [
1005 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_z","name":"bash"}}"#,
1006 "",
1007 "data: [DONE]",
1008 "",
1009 ];
1010 let (decoder, _text, _events) = drive(&lines);
1011 assert_eq!(decoder.completed_tools.len(), 1);
1012 assert!(
1013 decoder.active_tools.is_empty(),
1014 "active_tools must be drained after finish()"
1015 );
1016 }
1017
1018 #[test]
1019 fn unknown_event_types_are_ignored() {
1020 let lines = [
1021 r#"data: {"type":"response.future_unknown_event","payload":{"x":1}}"#,
1022 "",
1023 r#"data: {"type":"response.output_text.delta","delta":"hi"}"#,
1024 "",
1025 ];
1026 let (_decoder, text_acc, _events) = drive(&lines);
1027 assert_eq!(text_acc, "hi");
1028 }
1029}