1use super::translate;
7use super::types::{ChatMessage, OaiEvent, ProviderConfig, StreamOptions, ToolCall};
8use super::wire::StreamDecoder;
9use crate::runtime::types::StreamEvent;
10use futures::StreamExt;
11use serde_json::{json, Value};
12use tokio::sync::mpsc;
13
14#[allow(clippy::too_many_arguments)]
20pub(crate) async fn call_oai_stream_inner(
21 cfg: &ProviderConfig,
22 client: &reqwest::Client,
23 tools_schema: &[Value],
24 system_prompt: &Option<String>,
25 messages: &[Value],
26 tx: &mpsc::UnboundedSender<StreamEvent>,
27 temperature: Option<f32>,
28 max_tokens: Option<u32>,
29 thinking_budget: u32,
30 cancel: &tokio_util::sync::CancellationToken,
31) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
32 let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
33 let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
34 let tools_opt = if oai_tools.is_empty() { None } else { Some(oai_tools) };
35
36 let stream_options = if cfg.base_url.contains("googleapis.com") {
38 None
39 } else {
40 Some(StreamOptions { include_usage: true })
41 };
42
43 let mut body = serde_json::Map::new();
44 body.insert("model".to_string(), json!(cfg.model.clone()));
45 body.insert("messages".to_string(), serde_json::to_value(oai_messages)?);
46 body.insert("stream".to_string(), json!(true));
47 if let Some(stream_options) = stream_options {
48 body.insert("stream_options".to_string(), serde_json::to_value(stream_options)?);
49 }
50 if let Some(max_tokens) = max_tokens {
51 body.insert("max_tokens".to_string(), json!(max_tokens));
52 }
53 if let Some(temperature) = temperature {
54 body.insert("temperature".to_string(), json!(temperature));
55 }
56 if let Some(tools) = tools_opt {
57 body.insert("tools".to_string(), serde_json::to_value(tools)?);
58 }
59 super::reasoning::apply_openai_reasoning_params(
60 &mut body,
61 super::reasoning::provider_for_key(&cfg.provider),
62 &cfg.model,
63 thinking_budget,
64 );
65 let body = Value::Object(body);
66
67 let url = format!("{}/chat/completions", cfg.base_url.trim_end_matches('/'));
68
69 tracing::debug!(url=%url, model=%cfg.model, "openai stream request");
70
71 let resp = match client
72 .post(&url)
73 .bearer_auth(&cfg.api_key)
74 .header("content-type", "application/json")
75 .header("accept", "text/event-stream")
76 .json(&body)
77 .send()
78 .await
79 {
80 Ok(r) => r,
81 Err(e) => {
82 if e.is_connect() && url.contains("localhost") {
83 return Err(format!(
84 "Can't reach local endpoint at {} — is Ollama/LM Studio running?",
85 url
86 ).into());
87 }
88 return Err(e.into());
89 }
90 };
91
92 if !resp.status().is_success() {
93 let status = resp.status();
94 let text = resp.text().await.unwrap_or_default();
95 return Err(format!("openai request failed: {status}: {text}").into());
96 }
97
98 let mut decoder = StreamDecoder::new();
99 let mut accumulated_text = String::new();
100 let mut tool_use_blocks: Vec<Value> = Vec::new();
101 let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
102 let mut sink: Vec<OaiEvent> = Vec::with_capacity(4);
103 let mut stream = resp.bytes_stream();
104
105 while let Some(chunk) = tokio::select! {
106 chunk = stream.next() => chunk,
107 _ = cancel.cancelled() => {
108 return Err("request canceled".into());
109 }
110 } {
111 let chunk = chunk?;
112 buf.extend_from_slice(&chunk);
113
114 while let Some(nl) = memchr::memchr(b'\n', &buf) {
116 let line_bytes = buf.split_to(nl + 1); let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
118
119 sink.clear();
120 decoder.push_line(line, &mut sink);
121 handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
122 }
123 }
124
125 if !buf.is_empty() {
127 let line = std::str::from_utf8(&buf).unwrap_or("");
128 sink.clear();
129 decoder.push_line(line, &mut sink);
130 handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
131 }
132 sink.clear();
133 decoder.finish(&mut sink);
134 handle_events(&sink, tx, &mut accumulated_text, &mut tool_use_blocks, &name_map);
135
136 let mut content: Vec<Value> = Vec::new();
138 if !accumulated_text.is_empty() {
139 content.push(json!({"type": "text", "text": accumulated_text}));
140 }
141 content.extend(tool_use_blocks);
142
143 Ok(json!({
144 "role": "assistant",
145 "content": content,
146 }))
147}
148
149#[allow(clippy::too_many_arguments)]
150pub(crate) async fn call_codex_stream_inner(
151 cfg: &ProviderConfig,
152 client: &reqwest::Client,
153 tools_schema: &[Value],
154 system_prompt: &Option<String>,
155 messages: &[Value],
156 tx: &mpsc::UnboundedSender<StreamEvent>,
157 temperature: Option<f32>,
158 max_tokens: Option<u32>,
159 cancel: &tokio_util::sync::CancellationToken,
160) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
161 let creds = if cfg.api_key.is_empty() {
162 crate::auth::ensure_fresh_provider_token(client, "openai-codex").await?
163 } else {
164 crate::auth::OAuthCredentials {
165 auth_type: "oauth".to_string(),
166 refresh: String::new(),
167 access: cfg.api_key.clone(),
168 expires: 0,
169 account_id: None,
170 }
171 };
172 let account_id = creds
173 .account_id
174 .clone()
175 .or_else(|| crate::auth::extract_codex_account_id(&creds.access))
176 .ok_or("Failed to extract ChatGPT account id from Codex token")?;
177
178 let (oai_tools, name_map) = translate::tools_to_oai(tools_schema);
179 let oai_messages = translate::messages_to_oai(messages, system_prompt, &name_map);
180 let tools: Vec<Value> = oai_tools
181 .into_iter()
182 .map(|tool| {
183 json!({
184 "type": "function",
185 "name": tool.function.name,
186 "description": tool.function.description.unwrap_or_default(),
187 "parameters": tool.function.parameters,
188 })
189 })
190 .collect();
191
192 let mut body = json!({
193 "model": cfg.model,
194 "store": false,
195 "stream": true,
196 "instructions": codex_instructions(system_prompt),
197 "input": codex_input_messages(oai_messages),
198 "tool_choice": "auto",
199 "parallel_tool_calls": true,
200 "include": ["reasoning.encrypted_content"],
201 "text": { "verbosity": "medium" },
202 });
203 if !tools.is_empty() {
204 body["tools"] = Value::Array(tools);
205 }
206 if let Some(temp) = temperature {
207 body["temperature"] = json!(temp);
208 }
209 if let Some(max) = max_tokens {
210 body["max_output_tokens"] = json!(max);
211 }
212
213 let url = format!(
214 "{}/codex/responses",
215 cfg.base_url.trim_end_matches('/').trim_end_matches("/codex")
216 );
217 tracing::debug!(url=%url, model=%cfg.model, "codex stream request");
218
219 let resp = client
220 .post(&url)
221 .bearer_auth(&creds.access)
222 .header("chatgpt-account-id", account_id)
223 .header("originator", "synaps")
224 .header("OpenAI-Beta", "responses=experimental")
225 .header("content-type", "application/json")
226 .header("accept", "text/event-stream")
227 .json(&body)
228 .send()
229 .await?;
230
231 if !resp.status().is_success() {
232 let status = resp.status();
233 let text = resp.text().await.unwrap_or_default();
234 return Err(format!("codex request failed: {status}: {text}").into());
235 }
236
237 let mut accumulated_text = String::new();
238 let mut parser = CodexSseDecoder::default();
239 let mut buf = bytes::BytesMut::with_capacity(8 * 1024);
240 let mut stream = resp.bytes_stream();
241
242 while let Some(chunk) = tokio::select! {
243 chunk = stream.next() => chunk,
244 _ = cancel.cancelled() => {
245 return Err("request canceled".into());
246 }
247 } {
248 let chunk = chunk?;
249 buf.extend_from_slice(&chunk);
250 while let Some(nl) = memchr::memchr(b'\n', &buf) {
251 let line_bytes = buf.split_to(nl + 1);
252 let line = std::str::from_utf8(&line_bytes[..nl]).unwrap_or("");
253 parser.push_line(line, tx, &mut accumulated_text);
254 }
255 }
256 if !buf.is_empty() {
257 let line = std::str::from_utf8(&buf).unwrap_or("");
258 parser.push_line(line, tx, &mut accumulated_text);
259 }
260 parser.finish();
261
262 let mut content: Vec<Value> = Vec::new();
263 if !accumulated_text.is_empty() {
264 content.push(json!({"type": "text", "text": accumulated_text}));
265 }
266 content.extend(translate::tool_calls_to_content_blocks(&parser.completed_tools, &name_map));
267
268 Ok(json!({
269 "role": "assistant",
270 "content": content,
271 }))
272}
273
274const CODEX_AUTONOMOUS_LOOP_POLICY: &str = "\n\n[Synaps autonomous harness policy]\nThis harness is non-interactive after the user has provided the task/spec. Do not stop at phase boundaries, milestones, checkpoints, or after presenting a plan unless the full requested job is complete. Do not ask the user whether to continue. When a phase/checkpoint is reached, run any relevant verification and continue autonomously until the full requested job is complete, blocked by an unrecoverable error, or explicit user instructions require stopping.\n[End Synaps autonomous harness policy]";
275
276fn codex_instructions(system_prompt: &Option<String>) -> String {
277 let mut instructions = system_prompt.clone().unwrap_or_default();
278 if instructions.contains("[Synaps autonomous harness policy]") {
279 return instructions;
280 }
281 instructions.push_str(CODEX_AUTONOMOUS_LOOP_POLICY);
282 instructions
283}
284
285fn codex_input_messages(messages: Vec<ChatMessage>) -> Vec<Value> {
286 let mut out = Vec::new();
287 for msg in messages {
288 if let Some(tool_calls) = msg.tool_calls {
289 for call in tool_calls {
290 let mut item = json!({
297 "type": "function_call",
298 "call_id": call.id,
299 "name": call.function.name,
300 "arguments": call.function.arguments,
301 });
302 if call.id.starts_with("fc") {
303 item["id"] = Value::from(call.id.clone());
304 }
305 out.push(item);
306 }
307 continue;
308 }
309 if msg.role == "tool" {
310 if let Some(call_id) = msg.tool_call_id {
313 out.push(json!({
314 "type": "function_call_output",
315 "call_id": call_id,
316 "output": msg.content.unwrap_or_default(),
317 }));
318 }
319 continue;
320 }
321 out.push(json!({
322 "role": msg.role,
323 "content": msg.content.unwrap_or_default(),
324 }));
325 }
326 out
327}
328
329#[derive(Default)]
330struct CodexSseDecoder {
331 buffer: String,
332 active_tools: Vec<CodexToolAccumulator>,
333 completed_tools: Vec<ToolCall>,
334}
335
336#[derive(Default)]
337struct CodexToolAccumulator {
338 id: String,
339 name: String,
340 arguments: String,
341 started: bool,
342}
343
344fn parse_tool_arguments(raw: &str) -> Value {
352 if raw.trim().is_empty() {
353 return json!({});
354 }
355 match serde_json::from_str(raw) {
356 Ok(v) => v,
357 Err(e) => json!({ "__parse_error": format!("invalid tool input JSON: {}", e) }),
358 }
359}
360
361impl CodexSseDecoder {
362 fn push_line(
363 &mut self,
364 line: &str,
365 tx: &mpsc::UnboundedSender<StreamEvent>,
366 text_acc: &mut String,
367 ) {
368 let line = line.trim_end_matches('\r');
369 if line.is_empty() {
370 if !self.buffer.is_empty() {
371 let payload = std::mem::take(&mut self.buffer);
372 self.push_payload(&payload, tx, text_acc);
373 }
374 return;
375 }
376 let Some(data) = line.strip_prefix("data:").map(str::trim_start) else {
377 return;
378 };
379 if data == "[DONE]" {
380 self.finish();
381 return;
382 }
383 self.buffer.push_str(data);
384 }
385
386 fn push_payload(
387 &mut self,
388 payload: &str,
389 tx: &mpsc::UnboundedSender<StreamEvent>,
390 text_acc: &mut String,
391 ) {
392 let Ok(event) = serde_json::from_str::<Value>(payload) else {
393 return;
394 };
395 let event_type = event.get("type").and_then(Value::as_str).unwrap_or_default();
396 match event_type {
397 "response.output_text.delta" => {
398 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
399 text_acc.push_str(delta);
400 let _ = tx.send(StreamEvent::Llm(crate::runtime::types::LlmEvent::Text(
401 delta.to_string(),
402 )));
403 }
404 }
405 "response.output_item.added" => {
406 if let Some(item) = event.get("item") {
407 let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
408 self.add_tool_from_item(idx, item, tx);
409 }
410 }
411 "response.function_call_arguments.delta" => {
412 let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
413 let delta = event.get("delta").and_then(Value::as_str).unwrap_or_default();
414 if !delta.is_empty() {
415 let tool = self.ensure_tool(idx);
416 tool.arguments.push_str(delta);
417 let tool_id = tool.id.clone();
418 let _ = tx.send(StreamEvent::Llm(
419 crate::runtime::types::LlmEvent::ToolUseDelta {
420 tool_id,
421 delta: delta.to_string(),
422 },
423 ));
424 }
425 }
426 "response.output_item.done" => {
427 if let Some(item) = event.get("item") {
428 let idx = event.get("output_index").and_then(Value::as_u64).unwrap_or(0) as usize;
429 self.complete_tool_from_item(idx, item, tx);
430 }
431 }
432 "response.completed" | "response.done" => {
433 self.push_usage(&event, tx);
434 self.finish();
435 }
436 _ => {}
437 }
438 }
439
440 fn ensure_tool(&mut self, idx: usize) -> &mut CodexToolAccumulator {
441 while self.active_tools.len() <= idx {
442 self.active_tools.push(CodexToolAccumulator::default());
443 }
444 &mut self.active_tools[idx]
445 }
446
447 fn add_tool_from_item(
448 &mut self,
449 idx: usize,
450 item: &Value,
451 tx: &mpsc::UnboundedSender<StreamEvent>,
452 ) {
453 if item.get("type").and_then(Value::as_str) != Some("function_call") {
454 return;
455 }
456 let tool = self.ensure_tool(idx);
457 if let Some(id) = item
458 .get("call_id")
459 .or_else(|| item.get("id"))
460 .and_then(Value::as_str)
461 {
462 tool.id = id.to_string();
463 }
464 if let Some(name) = item.get("name").and_then(Value::as_str) {
465 tool.name = name.to_string();
466 }
467 if !tool.started && !tool.name.is_empty() {
468 tool.started = true;
469 let _ = tx.send(StreamEvent::Llm(
470 crate::runtime::types::LlmEvent::ToolUseStart {
471 tool_name: tool.name.clone(),
472 tool_id: tool.id.clone(),
473 },
474 ));
475 }
476 }
477
478 fn complete_tool_from_item(
479 &mut self,
480 idx: usize,
481 item: &Value,
482 tx: &mpsc::UnboundedSender<StreamEvent>,
483 ) {
484 if item.get("type").and_then(Value::as_str) != Some("function_call") {
485 return;
486 }
487 let tool = self.ensure_tool(idx);
488 if let Some(id) = item
489 .get("call_id")
490 .or_else(|| item.get("id"))
491 .and_then(Value::as_str)
492 {
493 tool.id = id.to_string();
494 }
495 if let Some(name) = item.get("name").and_then(Value::as_str) {
496 tool.name = name.to_string();
497 }
498 if let Some(arguments) = item.get("arguments").and_then(Value::as_str) {
499 tool.arguments = arguments.to_string();
500 }
501 if !tool.started && !tool.name.is_empty() {
502 tool.started = true;
503 let _ = tx.send(StreamEvent::Llm(
504 crate::runtime::types::LlmEvent::ToolUseStart {
505 tool_name: tool.name.clone(),
506 tool_id: tool.id.clone(),
507 },
508 ));
509 }
510 let completed = if !tool.id.is_empty() && !tool.name.is_empty() {
511 Some(ToolCall {
512 id: tool.id.clone(),
513 kind: "function".to_string(),
514 function: super::types::FunctionCall {
515 name: tool.name.clone(),
516 arguments: tool.arguments.clone(),
517 },
518 })
519 } else {
520 None
521 };
522 if let Some(call) = completed {
523 if self.completed_tools.iter().any(|done| done.id == call.id) {
524 return;
525 }
526 let input = parse_tool_arguments(&call.function.arguments);
534 let _ = tx.send(StreamEvent::Llm(
535 crate::runtime::types::LlmEvent::ToolUse {
536 tool_name: call.function.name.clone(),
537 tool_id: call.id.clone(),
538 input,
539 },
540 ));
541 self.completed_tools.push(ToolCall {
542 id: call.id,
543 kind: call.kind,
544 function: call.function,
545 });
546 }
547 }
548
549 fn push_usage(&self, event: &Value, tx: &mpsc::UnboundedSender<StreamEvent>) {
550 let usage = event
551 .get("response")
552 .and_then(|r| r.get("usage"))
553 .or_else(|| event.get("usage"));
554 let input = usage
555 .and_then(|u| u.get("input_tokens"))
556 .and_then(Value::as_u64)
557 .unwrap_or(0);
558 let output = usage
559 .and_then(|u| u.get("output_tokens"))
560 .and_then(Value::as_u64)
561 .unwrap_or(0);
562 if input > 0 || output > 0 {
563 let _ = tx.send(StreamEvent::Session(crate::runtime::types::SessionEvent::Usage {
564 input_tokens: input,
565 output_tokens: output,
566 cache_read_input_tokens: 0,
567 cache_creation_input_tokens: 0,
568 cache_creation_5m: None,
569 cache_creation_1h: None,
570 model: None,
571 }));
572 }
573 }
574
575 fn finish(&mut self) {
576 for tool in self.active_tools.drain(..) {
577 if !tool.id.is_empty()
578 && !tool.name.is_empty()
579 && !self.completed_tools.iter().any(|done| done.id == tool.id)
580 {
581 self.completed_tools.push(ToolCall {
582 id: tool.id,
583 kind: "function".to_string(),
584 function: super::types::FunctionCall {
585 name: tool.name,
586 arguments: tool.arguments,
587 },
588 });
589 }
590 }
591 }
592}
593
594fn handle_events(
595 events: &[OaiEvent],
596 tx: &mpsc::UnboundedSender<StreamEvent>,
597 text_acc: &mut String,
598 tool_blocks: &mut Vec<Value>,
599 name_map: &translate::ToolNameMap,
600) {
601 for ev in events {
602 if let OaiEvent::TextDelta(t) = ev {
603 text_acc.push_str(t);
604 }
605 if let OaiEvent::ToolCallsComplete { calls, .. } = ev {
606 tool_blocks.extend(translate::tool_calls_to_content_blocks(calls, name_map));
607 }
608 if let Some(se) = translate::oai_event_to_llm(ev) {
609 let _ = tx.send(se);
610 }
611 }
612}
613
614#[cfg(test)]
615mod codex_input_messages_tests {
616 use super::*;
632 use super::super::types::{ChatMessage, FunctionCall, ToolCall};
633
634 fn sample_tool_call() -> ToolCall {
635 ToolCall {
636 id: "call_nZYquCuGUh8Qs9H51dwHMDgs".to_string(),
637 kind: "function".to_string(),
638 function: FunctionCall {
639 name: "bash".to_string(),
640 arguments: r#"{"command":"ls"}"#.to_string(),
641 },
642 }
643 }
644
645 #[test]
646 fn codex_instructions_appends_autonomous_loop_policy() {
647 let instructions = codex_instructions(&Some("Project-specific rules.".to_string()));
648 assert!(instructions.contains("Project-specific rules."));
649 assert!(instructions.contains("Do not stop at phase boundaries"));
650 assert!(instructions.contains("Do not ask the user whether to continue"));
651 assert!(instructions.contains("continue autonomously until the full requested job is complete"));
652 }
653
654 #[test]
655 fn function_call_input_omits_non_fc_id() {
656 let messages = vec![ChatMessage::assistant_tool_calls(vec![sample_tool_call()])];
657 let out = codex_input_messages(messages);
658 assert_eq!(out.len(), 1, "one tool_call → one input item");
659 let item = &out[0];
660 assert_eq!(item.get("type").and_then(Value::as_str), Some("function_call"));
661 assert!(
662 item.get("id").is_none(),
663 "must not echo a non-`fc_` id back; got {:?}",
664 item.get("id"),
665 );
666 assert_eq!(
667 item.get("call_id").and_then(Value::as_str),
668 Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
669 );
670 assert_eq!(item.get("name").and_then(Value::as_str), Some("bash"));
671 }
672
673 #[test]
674 fn function_call_input_keeps_real_fc_id() {
675 let mut call = sample_tool_call();
678 call.id = "fc_abc123".to_string();
679 let messages = vec![ChatMessage::assistant_tool_calls(vec![call])];
680 let out = codex_input_messages(messages);
681 let item = &out[0];
682 assert_eq!(item.get("id").and_then(Value::as_str), Some("fc_abc123"));
683 assert_eq!(item.get("call_id").and_then(Value::as_str), Some("fc_abc123"));
684 }
685
686 #[test]
687 fn function_call_output_round_trips_call_id() {
688 let messages = vec![ChatMessage::tool_result(
690 "call_nZYquCuGUh8Qs9H51dwHMDgs",
691 "bash",
692 "total 0",
693 )];
694 let out = codex_input_messages(messages);
695 let item = &out[0];
696 assert_eq!(
697 item.get("type").and_then(Value::as_str),
698 Some("function_call_output"),
699 );
700 assert_eq!(
701 item.get("call_id").and_then(Value::as_str),
702 Some("call_nZYquCuGUh8Qs9H51dwHMDgs"),
703 );
704 assert_eq!(item.get("output").and_then(Value::as_str), Some("total 0"));
705 }
706}
707
708#[cfg(test)]
709mod codex_decoder_tests {
710 use super::*;
717 use crate::runtime::types::{LlmEvent, SessionEvent, StreamEvent};
718
719 fn collect_events(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> Vec<StreamEvent> {
720 let mut out = Vec::new();
721 while let Ok(ev) = rx.try_recv() {
722 out.push(ev);
723 }
724 out
725 }
726
727 fn drive(lines: &[&str]) -> (CodexSseDecoder, String, Vec<StreamEvent>) {
728 let (tx, mut rx) = mpsc::unbounded_channel();
729 let mut decoder = CodexSseDecoder::default();
730 let mut text_acc = String::new();
731 for line in lines {
732 decoder.push_line(line, &tx, &mut text_acc);
733 }
734 let events = collect_events(&mut rx);
735 (decoder, text_acc, events)
736 }
737
738 #[test]
739 fn text_delta_aggregates_into_text_acc_and_emits_text_events() {
740 let lines = [
741 r#"data: {"type":"response.output_text.delta","delta":"Hello, "}"#,
742 "",
743 r#"data: {"type":"response.output_text.delta","delta":"world!"}"#,
744 "",
745 ];
746 let (_decoder, text_acc, events) = drive(&lines);
747 assert_eq!(text_acc, "Hello, world!");
748 let texts: Vec<_> = events
749 .iter()
750 .filter_map(|e| match e {
751 StreamEvent::Llm(LlmEvent::Text(t)) => Some(t.as_str()),
752 _ => None,
753 })
754 .collect();
755 assert_eq!(texts, vec!["Hello, ", "world!"]);
756 }
757
758 #[test]
759 fn single_function_call_completes_via_output_item_done() {
760 let lines = [
761 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
762 "",
763 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\""}"#,
764 "",
765 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":":\"ls\"}"}"#,
766 "",
767 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
768 "",
769 ];
770 let (decoder, _text, events) = drive(&lines);
771
772 assert_eq!(decoder.completed_tools.len(), 1);
773 let tool = &decoder.completed_tools[0];
774 assert_eq!(tool.id, "call_abc");
775 assert_eq!(tool.function.name, "bash");
776 assert_eq!(tool.function.arguments, r#"{"cmd":"ls"}"#);
777
778 let starts: Vec<_> = events
780 .iter()
781 .filter_map(|e| match e {
782 StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
783 Some((tool_name.as_str(), tool_id.as_str()))
784 }
785 _ => None,
786 })
787 .collect();
788 assert_eq!(
789 starts,
790 vec![("bash", "call_abc")],
791 "exactly one ToolUseStart with correct tool_id"
792 );
793
794 let deltas: Vec<_> = events
797 .iter()
798 .filter_map(|e| match e {
799 StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
800 Some((tool_id.as_str(), delta.as_str()))
801 }
802 _ => None,
803 })
804 .collect();
805 assert_eq!(
806 deltas,
807 vec![("call_abc", r#"{"cmd""#), ("call_abc", r#":"ls"}"#)]
808 );
809 }
810
811 #[test]
812 fn parallel_tool_calls_indexed_separately() {
813 let lines = [
814 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
815 "",
816 r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
817 "",
818 r#"data: {"type":"response.function_call_arguments.delta","output_index":1,"delta":"{\"path\":\"a\"}"}"#,
819 "",
820 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"cmd\":\"ls\"}"}"#,
821 "",
822 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"cmd\":\"ls\"}"}}"#,
823 "",
824 r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
825 "",
826 ];
827 let (decoder, _text, _events) = drive(&lines);
828
829 assert_eq!(decoder.completed_tools.len(), 2);
830 let mut by_id: std::collections::BTreeMap<&str, &ToolCall> = std::collections::BTreeMap::new();
831 for tool in &decoder.completed_tools {
832 by_id.insert(tool.id.as_str(), tool);
833 }
834 assert_eq!(by_id["call_1"].function.name, "bash");
835 assert_eq!(by_id["call_1"].function.arguments, r#"{"cmd":"ls"}"#);
836 assert_eq!(by_id["call_2"].function.name, "read");
837 assert_eq!(by_id["call_2"].function.arguments, r#"{"path":"a"}"#);
838 }
839
840 #[test]
841 fn output_item_done_emits_tool_use_event() {
842 let lines = [
849 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash"}}"#,
850 "",
851 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"command\":\"ls\"}"}"#,
852 "",
853 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_abc","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
854 "",
855 ];
856 let (_decoder, _text, events) = drive(&lines);
857
858 let tool_uses: Vec<_> = events
859 .iter()
860 .filter_map(|e| match e {
861 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
862 Some((tool_name.as_str(), tool_id.as_str(), input.clone()))
863 }
864 _ => None,
865 })
866 .collect();
867 assert_eq!(tool_uses.len(), 1, "expected exactly one ToolUse finalize event");
868 assert_eq!(tool_uses[0].0, "bash");
869 assert_eq!(tool_uses[0].1, "call_abc");
870 assert_eq!(
871 tool_uses[0].2,
872 serde_json::json!({"command": "ls"}),
873 "input must be parsed as a JSON Value, not a string"
874 );
875 }
876
877 #[test]
878 fn parallel_tool_calls_emit_tool_use_per_index() {
879 let lines = [
883 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash"}}"#,
884 "",
885 r#"data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read"}}"#,
886 "",
887 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}"#,
888 "",
889 r#"data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","call_id":"call_2","name":"read","arguments":"{\"path\":\"a\"}"}}"#,
890 "",
891 ];
892 let (_decoder, _text, events) = drive(&lines);
893
894 let tool_uses: Vec<_> = events
895 .iter()
896 .filter_map(|e| match e {
897 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
898 Some((tool_name.clone(), tool_id.clone(), input.clone()))
899 }
900 _ => None,
901 })
902 .collect();
903
904 assert_eq!(tool_uses.len(), 2, "one ToolUse finalize per parallel call");
905 let by_id: std::collections::BTreeMap<&str, &(String, String, serde_json::Value)> =
906 tool_uses.iter().map(|t| (t.1.as_str(), t)).collect();
907 assert_eq!(by_id["call_1"].0, "bash");
908 assert_eq!(by_id["call_1"].2, serde_json::json!({"command": "ls"}));
909 assert_eq!(by_id["call_2"].0, "read");
910 assert_eq!(by_id["call_2"].2, serde_json::json!({"path": "a"}));
911 }
912
913 #[test]
914 fn malformed_arguments_emit_tool_use_with_parse_error() {
915 let lines = [
920 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash"}}"#,
921 "",
922 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_bad","name":"bash","arguments":"{not json"}}"#,
923 "",
924 ];
925 let (_decoder, _text, events) = drive(&lines);
926
927 let tool_use = events.iter().find_map(|e| match e {
928 StreamEvent::Llm(LlmEvent::ToolUse { input, .. }) => Some(input.clone()),
929 _ => None,
930 });
931 let input = tool_use.expect("ToolUse event missing");
932 assert!(
933 input.get("__parse_error").and_then(Value::as_str).is_some(),
934 "malformed arguments must surface __parse_error, got {input}"
935 );
936 }
937
938 #[test]
939 fn response_completed_emits_usage_event() {
940 let lines = [
941 r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":42,"output_tokens":17}}}"#,
942 "",
943 ];
944 let (_decoder, _text, events) = drive(&lines);
945 let usage = events.iter().find_map(|e| match e {
946 StreamEvent::Session(SessionEvent::Usage {
947 input_tokens,
948 output_tokens,
949 ..
950 }) => Some((*input_tokens, *output_tokens)),
951 _ => None,
952 });
953 assert_eq!(usage, Some((42, 17)));
954 }
955
956 #[test]
957 fn response_completed_with_zero_usage_emits_nothing() {
958 let lines = [
959 r#"data: {"type":"response.completed","response":{"usage":{"input_tokens":0,"output_tokens":0}}}"#,
960 "",
961 ];
962 let (_decoder, _text, events) = drive(&lines);
963 let any_usage = events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })));
964 assert!(!any_usage, "zero-token usage should be suppressed");
965 }
966
967 #[test]
968 fn done_marker_finishes_decoder() {
969 let lines = [
970 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_x","name":"bash"}}"#,
971 "",
972 r#"data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{}"}"#,
973 "",
974 "data: [DONE]",
975 "",
976 ];
977 let (decoder, _text, _events) = drive(&lines);
978 assert_eq!(decoder.completed_tools.len(), 1);
980 assert_eq!(decoder.completed_tools[0].id, "call_x");
981 assert_eq!(decoder.completed_tools[0].function.arguments, "{}");
982 }
983
984 #[test]
985 fn finish_is_idempotent_no_double_emit() {
986 let lines = [
987 r#"data: {"type":"response.output_item.done","output_index":0,"item":{"type":"function_call","call_id":"call_y","name":"bash","arguments":"{}"}}"#,
988 "",
989 ];
990 let (mut decoder, _text, _events) = drive(&lines);
991 assert_eq!(decoder.completed_tools.len(), 1);
992
993 decoder.finish();
995 assert_eq!(
996 decoder.completed_tools.len(),
997 1,
998 "finish() called twice must not double-emit"
999 );
1000 }
1001
1002 #[test]
1003 fn finish_drains_active_tools_for_state_hygiene() {
1004 let lines = [
1009 r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","call_id":"call_z","name":"bash"}}"#,
1010 "",
1011 "data: [DONE]",
1012 "",
1013 ];
1014 let (decoder, _text, _events) = drive(&lines);
1015 assert_eq!(decoder.completed_tools.len(), 1);
1016 assert!(
1017 decoder.active_tools.is_empty(),
1018 "active_tools must be drained after finish()"
1019 );
1020 }
1021
1022 #[test]
1023 fn unknown_event_types_are_ignored() {
1024 let lines = [
1025 r#"data: {"type":"response.future_unknown_event","payload":{"x":1}}"#,
1026 "",
1027 r#"data: {"type":"response.output_text.delta","delta":"hi"}"#,
1028 "",
1029 ];
1030 let (_decoder, text_acc, _events) = drive(&lines);
1031 assert_eq!(text_acc, "hi");
1032 }
1033}