sparrow_providers/openai_compat.rs
1use async_trait::async_trait;
2use futures::stream::{self, StreamExt};
3use reqwest::Client;
4use serde_json::json;
5use std::collections::HashMap;
6
7use super::{Brain, BrainEvent, BrainRequest, BrainStream, ContentBlock, LatencyClass, ModelCaps};
8
9/// Process-monotonic counter for synthesized tool-call ids (B8): markup-derived
10/// and id-less native calls get a unique id so two turns in one run can't
11/// collide on `markup-call-0` and confuse id-keyed approval/replay state.
12static SYNTH_TOOL_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
13
14fn next_synth_id(kind: &str) -> String {
15 let n = SYNTH_TOOL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
16 format!("{kind}-call-{n}")
17}
18
19/// Sorted indices of a tool-call accumulator, ascending. Used to emit
20/// `ToolUseEnd` in the order the model declared the calls (index order), not
21/// the arbitrary order a `HashMap` drains in (A1/A2).
22fn sorted_indices(keys: impl Iterator<Item = u64>) -> Vec<u64> {
23 let mut idxs: Vec<u64> = keys.collect();
24 idxs.sort_unstable();
25 idxs
26}
27
28/// OpenAI-compatible adapter. Covers OpenAI, Groq, NVIDIA NIM, Together, Cerebras,
29/// OpenRouter, NovitaAI, Nous Portal, HuggingFace, Ollama, and custom endpoints.
30pub struct OpenAICompatAdapter {
31 model: String,
32 api_key: String,
33 base_url: String,
34 client: Client,
35 caps: ModelCaps,
36 echo_reasoning: bool,
37}
38
39impl OpenAICompatAdapter {
40 pub fn new(model: &str, api_key: impl Into<String>, base_url: &str) -> Self {
41 let model = model.to_string();
42 Self {
43 model,
44 api_key: api_key.into(),
45 base_url: base_url.to_string(),
46 client: Client::new(),
47 caps: ModelCaps::default(),
48 echo_reasoning: true,
49 }
50 }
51
52 pub fn with_caps(mut self, caps: ModelCaps) -> Self {
53 self.caps = caps;
54 self
55 }
56
57 pub fn with_echo_reasoning(mut self, echo_reasoning: bool) -> Self {
58 self.echo_reasoning = echo_reasoning;
59 self
60 }
61
62 /// Create an Ollama adapter (OpenAI-compatible API on localhost)
63 pub fn ollama(model: &str, base_url: &str) -> Self {
64 // Ollama doesn't require an API key
65 Self::new(model, "ollama", base_url).with_caps(ModelCaps {
66 context_window: 32_768,
67 max_output: 8_000,
68 tools: true,
69 vision: false,
70 cost_input_per_mtok: 0.0,
71 cost_output_per_mtok: 0.0,
72 latency: LatencyClass::Medium,
73 })
74 }
75}
76
77fn build_chat_body(model: &str, req: &BrainRequest, echo_reasoning: bool) -> serde_json::Value {
78 let mut messages: Vec<serde_json::Value> = Vec::new();
79
80 // Add system message
81 if let Some(sys) = &req.system {
82 messages.push(json!({
83 "role": "system",
84 "content": sys,
85 }));
86 }
87
88 // Convert messages
89 for msg in &req.messages {
90 if msg.role == "system" {
91 messages.push(json!({
92 "role": "system",
93 "content": msg.content.iter()
94 .filter_map(|b| match b {
95 ContentBlock::Text { text } => Some(text.clone()),
96 _ => None,
97 })
98 .collect::<Vec<_>>()
99 .join("\n"),
100 }));
101 continue;
102 }
103
104 let mut content: Vec<serde_json::Value> = Vec::new();
105 let mut tool_calls: Vec<serde_json::Value> = Vec::new();
106 let mut reasoning_buf = String::new();
107 let mut emitted_tool_result = false;
108
109 for block in &msg.content {
110 match block {
111 ContentBlock::Text { text } => {
112 content.push(json!({"type": "text", "text": text}));
113 }
114 ContentBlock::Image { source } => {
115 content.push(json!({
116 "type": "image_url",
117 "image_url": {
118 "url": image_source_url(source),
119 }
120 }));
121 }
122 ContentBlock::Reasoning { text } if echo_reasoning => {
123 // DeepSeek / Moonshot / Qwen "thinking mode" require the
124 // model's previous reasoning_content to be echoed back
125 // on the next turn or the API rejects with 400. We aggregate
126 // all reasoning blocks of this message and ship them as a
127 // single `reasoning_content` field.
128 if !reasoning_buf.is_empty() {
129 reasoning_buf.push('\n');
130 }
131 reasoning_buf.push_str(text);
132 }
133 ContentBlock::Reasoning { .. } => {}
134 ContentBlock::ToolUse { id, name, input } => {
135 tool_calls.push(json!({
136 "id": id,
137 "type": "function",
138 "function": {
139 "name": name,
140 "arguments": serde_json::to_string(input).unwrap_or_default(),
141 }
142 }));
143 }
144 ContentBlock::ToolResult {
145 tool_use_id,
146 content: tool_content,
147 ..
148 } => {
149 let text = tool_content
150 .iter()
151 .filter_map(|b| match b {
152 ContentBlock::Text { text } => Some(text.clone()),
153 _ => None,
154 })
155 .collect::<Vec<_>>()
156 .join("\n");
157 messages.push(json!({
158 "role": "tool",
159 "tool_call_id": tool_use_id,
160 "content": text,
161 }));
162 emitted_tool_result = true;
163 continue; // tool results are separate messages
164 }
165 }
166 }
167
168 if emitted_tool_result && content.is_empty() && tool_calls.is_empty() {
169 continue;
170 }
171
172 let mut msg_json = json!({ "role": msg.role });
173
174 if !tool_calls.is_empty() {
175 msg_json["tool_calls"] = json!(tool_calls);
176 }
177 if !content.is_empty() {
178 if content.len() == 1 && content[0]["type"] == "text" {
179 msg_json["content"] = json!(content[0]["text"]);
180 } else {
181 msg_json["content"] = json!(content);
182 }
183 }
184 if !reasoning_buf.is_empty() && msg.role == "assistant" {
185 msg_json["reasoning_content"] = json!(reasoning_buf);
186 }
187
188 messages.push(msg_json);
189 }
190
191 // Build tools
192 let tools: Vec<serde_json::Value> = req
193 .tools
194 .iter()
195 .map(|t| {
196 json!({
197 "type": "function",
198 "function": {
199 "name": t.name,
200 "description": t.description,
201 "parameters": t.input_schema,
202 }
203 })
204 })
205 .collect();
206
207 let mut body = json!({
208 "model": model,
209 "messages": messages,
210 "stream": true,
211 "stream_options": {
212 "include_usage": true
213 },
214 "temperature": req.temperature,
215 });
216
217 if req.max_tokens > 0 {
218 body["max_tokens"] = json!(req.max_tokens);
219 }
220 if !tools.is_empty() {
221 body["tools"] = json!(tools);
222 }
223 if !req.stop.is_empty() {
224 body["stop"] = json!(req.stop);
225 }
226 // NOTE: we deliberately never emit `prompt_cache_key` / `prompt_cache_retention`
227 // here. This one adapter fronts dozens of OpenAI-compatible endpoints and
228 // proxies (opencode-go, NVIDIA NIM, Groq, stepfun, Ollama, …). Many reject
229 // unknown parameters with HTTP 400 — e.g. opencode-go:
230 // "Validation: Unsupported parameter(s): prompt_cache_retention,
231 // prompt_cache_key"
232 // which made the first model in every routing chain fail and waste a turn.
233 // Prompt caching stays an Anthropic-only feature (handled in anthropic.rs,
234 // which keys off `req.cache.enabled` independently). The engine may still
235 // set `req.cache.enabled` for the run; this adapter simply ignores it.
236
237 body
238}
239
240fn image_source_url(source: &super::ImageSource) -> String {
241 match source {
242 super::ImageSource::Base64 { media_type, data } => {
243 format!("data:{};base64,{}", media_type, data)
244 }
245 super::ImageSource::Url { url } => url.clone(),
246 }
247}
248
249#[async_trait]
250impl Brain for OpenAICompatAdapter {
251 fn id(&self) -> &str {
252 &self.model
253 }
254
255 fn caps(&self) -> ModelCaps {
256 self.caps.clone()
257 }
258
259 async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
260 let body = build_chat_body(&self.model, &req, self.echo_reasoning);
261
262 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
263
264 let response = self
265 .client
266 .post(&url)
267 .header("Authorization", format!("Bearer {}", self.api_key))
268 .json(&body)
269 .send()
270 .await?;
271
272 if !response.status().is_success() {
273 let status = response.status().as_u16();
274 let body = response.text().await.unwrap_or_default();
275 return Err(anyhow::anyhow!(
276 "OpenAI-compatible API error {}: {}",
277 status,
278 body
279 ));
280 }
281
282 #[derive(Default)]
283 struct ToolCallState {
284 id: String,
285 started: bool,
286 }
287
288 let stream = response.bytes_stream();
289
290 // SSE state: tool-call accumulator + line buffer that survives chunk
291 // boundaries. Without the buffer, a JSON event split across two TCP
292 // chunks was parsed in halves and silently dropped — producing the
293 // "à rebours" → "àours" mangling.
294 struct SseState {
295 tools: HashMap<u64, ToolCallState>,
296 lines: super::sse_buffer::LineBuffer,
297 /// Accumulated assistant `content` text for this completion. Used
298 /// to recover tool calls a provider emitted as inline XML/DSML
299 /// markup inside `content` rather than as native `tool_calls`
300 /// (see provider::tool_markup).
301 content_buf: String,
302 /// True once we've decided the content is inline tool-call markup
303 /// and should be suppressed from the visible text stream.
304 suppress_text: bool,
305 /// Text held while the beginning of `content` is ambiguous: it may
306 /// still become inline tool-call markup once more chunks arrive.
307 pending_text: String,
308 /// B4: true once reasoning has been seen on the streaming `delta`
309 /// path. Providers also repeat the full reasoning under
310 /// `message.reasoning_content` on the final chunk; without this
311 /// flag the engine concatenated both and echoed doubled reasoning
312 /// back (context bloat + 400 risk). We take delta OR message,
313 /// never both.
314 reasoning_seen: bool,
315 }
316
317 let event_stream = stream
318 .scan(
319 SseState {
320 tools: HashMap::new(),
321 lines: super::sse_buffer::LineBuffer::new(),
322 content_buf: String::new(),
323 suppress_text: false,
324 pending_text: String::new(),
325 reasoning_seen: false,
326 },
327 |state, chunk| {
328 let events: Vec<BrainEvent> = match chunk {
329 Ok(bytes) => {
330 let lines = state.lines.push(&bytes);
331 let tool_state = &mut state.tools;
332 let mut parsed = Vec::new();
333 for line in lines {
334 let line = line.trim();
335 if line.is_empty() || !line.starts_with("data: ") {
336 continue;
337 }
338 let data = &line[6..];
339 if data == "[DONE]" {
340 continue;
341 }
342 let event: serde_json::Value = match serde_json::from_str(data) {
343 Ok(v) => v,
344 Err(e) => {
345 tracing::debug!(
346 "JSON parse error: {} — data: {}",
347 e,
348 &data[..data.len().min(200)]
349 );
350 continue;
351 }
352 };
353
354 if let Some(choices) = event["choices"].as_array() {
355 for choice in choices {
356 if let Some(delta) = choice["delta"].as_object() {
357 if let Some(text) =
358 delta.get("content").and_then(|v| v.as_str())
359 {
360 if !text.is_empty() {
361 state.content_buf.push_str(text);
362 state.pending_text.push_str(text);
363 // If this completion's content turns
364 // out to be inline tool-call markup
365 // (DeepSeek DSML / Anthropic-style
366 // <invoke>), suppress it from the
367 // visible text stream — it'll be
368 // converted to real tool calls at
369 // finish_reason.
370 if !state.suppress_text
371 && super::tool_markup::looks_like_tool_markup(
372 &state.content_buf,
373 )
374 {
375 state.suppress_text = true;
376 state.pending_text.clear();
377 }
378 if !state.suppress_text
379 && !super::tool_markup::could_be_tool_markup_prefix(
380 &state.content_buf,
381 )
382 && !state.pending_text.is_empty()
383 {
384 parsed.push(BrainEvent::TextDelta(
385 std::mem::take(&mut state.pending_text),
386 ));
387 }
388 }
389 }
390 // DeepSeek / Moonshot thinking-mode emit
391 // reasoning trace alongside content. Capture
392 // it as a dedicated event so the engine can
393 // echo it back on the next turn (required
394 // by DeepSeek's contract).
395 // Several providers report this under
396 // different keys; check the known aliases.
397 for key in [
398 "reasoning_content",
399 "reasoning",
400 "thinking",
401 "thought",
402 ] {
403 if let Some(rtext) =
404 delta.get(key).and_then(|v| v.as_str())
405 {
406 if !rtext.is_empty() {
407 state.reasoning_seen = true;
408 parsed.push(BrainEvent::ReasoningDelta(
409 rtext.to_string(),
410 ));
411 }
412 }
413 }
414 }
415 // Some providers bundle the reasoning under
416 // `message.reasoning_content` on the final chunk
417 // rather than streaming it through `delta`. B4:
418 // only use it when nothing streamed via delta —
419 // otherwise it's the SAME trace repeated and
420 // concatenating both doubles it.
421 if !state.reasoning_seen {
422 if let Some(msg_obj) =
423 choice.get("message").and_then(|v| v.as_object())
424 {
425 for key in
426 ["reasoning_content", "reasoning", "thinking"]
427 {
428 if let Some(rtext) =
429 msg_obj.get(key).and_then(|v| v.as_str())
430 {
431 if !rtext.is_empty() {
432 state.reasoning_seen = true;
433 parsed.push(BrainEvent::ReasoningDelta(
434 rtext.to_string(),
435 ));
436 }
437 }
438 }
439 }
440 }
441 if let Some(delta) = choice["delta"].as_object() {
442 // (Re-open the original tool_calls block.)
443 let _ = delta; // keep this branch syntactically anchored
444 if let Some(tool_calls) =
445 delta.get("tool_calls").and_then(|v| v.as_array())
446 {
447 for tc in tool_calls {
448 let idx = tc
449 .get("index")
450 .and_then(|v| v.as_u64())
451 .unwrap_or(0);
452 let id = tc
453 .get("id")
454 .and_then(|v| v.as_str())
455 .map(|s| s.to_string());
456 let state = tool_state.entry(idx).or_default();
457 if let Some(id) = id {
458 state.id = id;
459 }
460 if let Some(func) = tc
461 .get("function")
462 .and_then(|v| v.as_object())
463 {
464 if let Some(name) = func
465 .get("name")
466 .and_then(|v| v.as_str())
467 {
468 if !state.started {
469 if state.id.is_empty() {
470 // B8: unique even when
471 // the provider omits the
472 // id, across turns.
473 state.id =
474 next_synth_id("tool");
475 }
476 state.started = true;
477 parsed.push(
478 BrainEvent::ToolUseStart {
479 id: state.id.clone(),
480 name: name.to_string(),
481 },
482 );
483 }
484 }
485 if let Some(args) = func
486 .get("arguments")
487 .and_then(|v| v.as_str())
488 {
489 if !state.id.is_empty()
490 && !args.is_empty()
491 {
492 parsed.push(
493 BrainEvent::ToolUseDelta {
494 id: state.id.clone(),
495 json: args.to_string(),
496 },
497 );
498 }
499 }
500 }
501 }
502 }
503 }
504
505 if let Some(reason) =
506 choice.get("finish_reason").and_then(|v| v.as_str())
507 {
508 if !reason.is_empty() && reason != "null" {
509 let stop = match reason {
510 "stop" => {
511 // A2: a provider may stream native
512 // tool_calls and then finish with
513 // "stop" (not "tool_calls"). Drain
514 // any pending native calls FIRST so
515 // they actually execute instead of
516 // being silently dropped.
517 let mut native = false;
518 for idx in sorted_indices(
519 tool_state.keys().copied(),
520 ) {
521 if let Some(st) =
522 tool_state.remove(&idx)
523 {
524 if !st.id.is_empty() {
525 parsed.push(
526 BrainEvent::ToolUseEnd {
527 id: st.id,
528 },
529 );
530 native = true;
531 }
532 }
533 }
534 // Otherwise recover tool calls a
535 // provider emitted as inline
536 // XML/DSML markup in `content` (with
537 // finish_reason "stop") instead of
538 // native tool_calls — without this
539 // the call leaks as raw text and
540 // never runs.
541 let calls = if !native
542 && super::tool_markup::looks_like_tool_markup(
543 &state.content_buf,
544 )
545 {
546 super::tool_markup::extract_tool_calls(
547 &state.content_buf,
548 )
549 } else {
550 Vec::new()
551 };
552 if native {
553 sparrow_core::event::StopReason::ToolUse
554 } else if calls.is_empty() {
555 if !state.suppress_text
556 && !state.pending_text.is_empty()
557 {
558 parsed.push(
559 BrainEvent::TextDelta(
560 std::mem::take(
561 &mut state.pending_text,
562 ),
563 ),
564 );
565 }
566 sparrow_core::event::StopReason::EndTurn
567 } else {
568 for call in calls.into_iter() {
569 // B8: unique id per
570 // synthesized call so two
571 // markup turns in one run
572 // never collide.
573 let id = next_synth_id("markup");
574 parsed.push(
575 BrainEvent::ToolUseStart {
576 id: id.clone(),
577 name: call.name,
578 },
579 );
580 parsed.push(
581 BrainEvent::ToolUseDelta {
582 id: id.clone(),
583 json: call
584 .args
585 .to_string(),
586 },
587 );
588 parsed.push(
589 BrainEvent::ToolUseEnd { id },
590 );
591 }
592 sparrow_core::event::StopReason::ToolUse
593 }
594 }
595 "length" => sparrow_core::event::StopReason::MaxTokens,
596 "tool_calls" => {
597 // A1/A2: emit Ends in index order,
598 // not HashMap-arbitrary order.
599 for idx in sorted_indices(
600 tool_state.keys().copied(),
601 ) {
602 if let Some(st) =
603 tool_state.remove(&idx)
604 {
605 if !st.id.is_empty() {
606 parsed.push(
607 BrainEvent::ToolUseEnd {
608 id: st.id,
609 },
610 );
611 }
612 }
613 }
614 sparrow_core::event::StopReason::ToolUse
615 }
616 s => sparrow_core::event::StopReason::StopSequence(
617 s.to_string(),
618 ),
619 };
620 parsed.push(BrainEvent::Done(stop));
621 }
622 }
623 }
624 }
625
626 if let Some(usage) = event.get("usage").and_then(|u| u.as_object())
627 {
628 // Use .get() — indexing a serde_json::Map with [] panics on a
629 // missing key, and some providers (e.g. MiniMax) omit fields.
630 parsed.push(BrainEvent::Usage(sparrow_core::event::TokenUsage {
631 input: usage
632 .get("prompt_tokens")
633 .and_then(|v| v.as_u64())
634 .unwrap_or(0),
635 output: usage
636 .get("completion_tokens")
637 .and_then(|v| v.as_u64())
638 .unwrap_or(0),
639 }));
640 }
641 }
642 parsed
643 }
644 Err(e) => vec![BrainEvent::Error(format!("stream error: {}", e))],
645 };
646 futures::future::ready(Some(stream::iter(events)))
647 },
648 )
649 .flatten();
650
651 Ok(Box::pin(event_stream))
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use crate::{Msg, PromptCacheConfig, PromptCacheTtl};
659 use futures::StreamExt;
660 use tokio::io::{AsyncReadExt, AsyncWriteExt};
661 use tokio::net::TcpListener;
662
663 #[test]
664 fn openai_chat_body_never_sends_prompt_cache_params() {
665 // Regression for the v0.8.x 400: many OpenAI-compatible proxies
666 // (opencode-go, …) reject `prompt_cache_key`/`prompt_cache_retention`
667 // with HTTP 400, which made the first routed model fail every run.
668 // These params are Anthropic-only (see anthropic.rs); this adapter
669 // must never emit them — even when the run enabled caching.
670 let req = BrainRequest {
671 system: Some("stable sparrow system".into()),
672 messages: vec![Msg {
673 role: "user".into(),
674 content: vec![ContentBlock::Text {
675 text: "dynamic task".into(),
676 }],
677 }],
678 cache: PromptCacheConfig {
679 enabled: true,
680 ttl: PromptCacheTtl::OneHour,
681 key: Some("sparrow-repo-abc".into()),
682 },
683 ..BrainRequest::default()
684 };
685
686 let body = build_chat_body("gpt-test", &req, true);
687 assert!(
688 body.get("prompt_cache_key").is_none(),
689 "prompt_cache_key must never be sent to an OpenAI-compatible endpoint"
690 );
691 assert!(
692 body.get("prompt_cache_retention").is_none(),
693 "prompt_cache_retention must never be sent to an OpenAI-compatible endpoint"
694 );
695 }
696
697 #[test]
698 fn openai_chat_body_serializes_image_blocks() {
699 let req = BrainRequest {
700 messages: vec![Msg {
701 role: "user".into(),
702 content: vec![
703 ContentBlock::Text {
704 text: "what is in this image?".into(),
705 },
706 ContentBlock::Image {
707 source: crate::ImageSource::Base64 {
708 media_type: "image/png".into(),
709 data: "iVBORw0KGgo=".into(),
710 },
711 },
712 ],
713 }],
714 ..BrainRequest::default()
715 };
716
717 let body = build_chat_body("gpt-test", &req, true);
718 assert_eq!(body["messages"][0]["content"][0]["type"], "text");
719 assert_eq!(body["messages"][0]["content"][1]["type"], "image_url");
720 assert_eq!(
721 body["messages"][0]["content"][1]["image_url"]["url"],
722 "data:image/png;base64,iVBORw0KGgo="
723 );
724 }
725
726 #[test]
727 fn openai_chat_body_reinjects_assistant_reasoning_content() {
728 let req = BrainRequest {
729 messages: vec![Msg {
730 role: "assistant".into(),
731 content: vec![
732 ContentBlock::Reasoning {
733 text: "opaque provider reasoning".into(),
734 },
735 ContentBlock::Text {
736 text: "visible answer".into(),
737 },
738 ],
739 }],
740 ..BrainRequest::default()
741 };
742
743 let body = build_chat_body("deepseek-test", &req, true);
744 assert_eq!(body["messages"][0]["content"], "visible answer");
745 assert_eq!(
746 body["messages"][0]["reasoning_content"],
747 "opaque provider reasoning"
748 );
749 }
750
751 #[test]
752 fn openai_chat_body_can_disable_reasoning_echo() {
753 let req = BrainRequest {
754 messages: vec![Msg {
755 role: "assistant".into(),
756 content: vec![
757 ContentBlock::Reasoning {
758 text: "provider-private reasoning".into(),
759 },
760 ContentBlock::Text {
761 text: "visible answer".into(),
762 },
763 ],
764 }],
765 ..BrainRequest::default()
766 };
767
768 let body = build_chat_body("provider-no-echo", &req, false);
769 assert_eq!(body["messages"][0]["content"], "visible answer");
770 assert!(
771 body["messages"][0].get("reasoning_content").is_none(),
772 "provider flagged echo_reasoning=false must not receive reasoning_content"
773 );
774 }
775
776 #[test]
777 fn multi_tool_turn_is_one_assistant_message_with_reasoning() {
778 // Regression for the v0.5.5 fix: a single model turn that emits N tool
779 // calls must serialize as ONE assistant message carrying
780 // reasoning_content + a tool_calls array of length N. Splitting it into
781 // one message per tool dropped reasoning_content from the 2nd+ calls,
782 // which DeepSeek/Qwen/Moonshot thinking-mode rejects with HTTP 400 and
783 // which aborted multi-file tasks half-way.
784 let req = BrainRequest {
785 messages: vec![Msg {
786 role: "assistant".into(),
787 content: vec![
788 ContentBlock::Reasoning {
789 text: "thinking about two files".into(),
790 },
791 ContentBlock::ToolUse {
792 id: "call_0".into(),
793 name: "fs_write".into(),
794 input: serde_json::json!({"path": "reverse.py"}),
795 },
796 ContentBlock::ToolUse {
797 id: "call_1".into(),
798 name: "fs_write".into(),
799 input: serde_json::json!({"path": "test_reverse.py"}),
800 },
801 ],
802 }],
803 ..BrainRequest::default()
804 };
805
806 let body = build_chat_body("deepseek-test", &req, true);
807 // exactly one assistant message
808 assert_eq!(body["messages"].as_array().unwrap().len(), 1);
809 // reasoning_content present on it
810 assert_eq!(
811 body["messages"][0]["reasoning_content"],
812 "thinking about two files"
813 );
814 // both tool calls in a single tool_calls array
815 let calls = body["messages"][0]["tool_calls"].as_array().unwrap();
816 assert_eq!(calls.len(), 2);
817 assert_eq!(calls[0]["id"], "call_0");
818 assert_eq!(calls[1]["id"], "call_1");
819 assert_eq!(calls[0]["function"]["name"], "fs_write");
820 }
821
822 #[tokio::test]
823 async fn b1_partial_markup_stream_never_emits_visible_text() {
824 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
825 let addr = listener.local_addr().unwrap();
826 let server = tokio::spawn(async move {
827 let (mut socket, _) = listener.accept().await.unwrap();
828 let mut buf = [0_u8; 4096];
829 let _ = socket.read(&mut buf).await.unwrap();
830 let chunks = [
831 "<",
832 "||DSML||invoke name=\"read\">",
833 "<||DSML||parameter name=\"file_path\" string=\"true\">",
834 "config.py",
835 "</||DSML||parameter>",
836 "</||DSML||invoke>",
837 ];
838 let mut body = String::new();
839 for chunk in chunks {
840 body.push_str("data: ");
841 body.push_str(
842 &serde_json::json!({
843 "choices": [{
844 "delta": {"content": chunk},
845 "finish_reason": null
846 }]
847 })
848 .to_string(),
849 );
850 body.push_str("\n\n");
851 }
852 body.push_str("data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n");
853 let response = format!(
854 "HTTP/1.1 200 OK\r\ncontent-type: text/event-stream\r\ncontent-length: {}\r\n\r\n{}",
855 body.len(),
856 body
857 );
858 socket.write_all(response.as_bytes()).await.unwrap();
859 });
860
861 let adapter =
862 OpenAICompatAdapter::new("deepseek-test", "test-key", &format!("http://{}", addr));
863 let mut stream = adapter.complete(BrainRequest::default()).await.unwrap();
864
865 let mut text = String::new();
866 let mut tool_name = None;
867 let mut tool_args = String::new();
868 let mut done = None;
869 while let Some(event) = stream.next().await {
870 match event {
871 BrainEvent::TextDelta(delta) => text.push_str(&delta),
872 BrainEvent::ToolUseStart { name, .. } => tool_name = Some(name),
873 BrainEvent::ToolUseDelta { json, .. } => tool_args.push_str(&json),
874 BrainEvent::ToolUseEnd { .. } => {}
875 BrainEvent::Done(reason) => done = Some(reason),
876 other => panic!("unexpected event: {other:?}"),
877 }
878 }
879 server.await.unwrap();
880
881 assert_eq!(
882 text, "",
883 "partial inline markup must not leak as visible text"
884 );
885 assert_eq!(tool_name.as_deref(), Some("read"));
886 let args: serde_json::Value = serde_json::from_str(&tool_args).unwrap();
887 assert_eq!(args["file_path"], "config.py");
888 assert!(matches!(done, Some(sparrow_core::event::StopReason::ToolUse)));
889 }
890}