Skip to main content

sparrow_providers/
openai_compat.rs

1use async_trait::async_trait;
2use futures::stream::{self, StreamExt};
3use reqwest::Client;
4use serde_json::json;
5use std::collections::HashMap;
6
7use super::{Brain, BrainEvent, BrainRequest, BrainStream, ContentBlock, LatencyClass, ModelCaps};
8
9/// Process-monotonic counter for synthesized tool-call ids (B8): markup-derived
10/// and id-less native calls get a unique id so two turns in one run can't
11/// collide on `markup-call-0` and confuse id-keyed approval/replay state.
12static SYNTH_TOOL_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
13
14fn next_synth_id(kind: &str) -> String {
15    let n = SYNTH_TOOL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
16    format!("{kind}-call-{n}")
17}
18
19/// Sorted indices of a tool-call accumulator, ascending. Used to emit
20/// `ToolUseEnd` in the order the model declared the calls (index order), not
21/// the arbitrary order a `HashMap` drains in (A1/A2).
22fn sorted_indices(keys: impl Iterator<Item = u64>) -> Vec<u64> {
23    let mut idxs: Vec<u64> = keys.collect();
24    idxs.sort_unstable();
25    idxs
26}
27
28/// OpenAI-compatible adapter. Covers OpenAI, Groq, NVIDIA NIM, Together, Cerebras,
29/// OpenRouter, NovitaAI, Nous Portal, HuggingFace, Ollama, and custom endpoints.
30pub struct OpenAICompatAdapter {
31    model: String,
32    api_key: String,
33    base_url: String,
34    client: Client,
35    caps: ModelCaps,
36    echo_reasoning: bool,
37}
38
39impl OpenAICompatAdapter {
40    pub fn new(model: &str, api_key: impl Into<String>, base_url: &str) -> Self {
41        let model = model.to_string();
42        Self {
43            model,
44            api_key: api_key.into(),
45            base_url: base_url.to_string(),
46            client: Client::new(),
47            caps: ModelCaps::default(),
48            echo_reasoning: true,
49        }
50    }
51
52    pub fn with_caps(mut self, caps: ModelCaps) -> Self {
53        self.caps = caps;
54        self
55    }
56
57    pub fn with_echo_reasoning(mut self, echo_reasoning: bool) -> Self {
58        self.echo_reasoning = echo_reasoning;
59        self
60    }
61
62    /// Create an Ollama adapter (OpenAI-compatible API on localhost)
63    pub fn ollama(model: &str, base_url: &str) -> Self {
64        // Ollama doesn't require an API key
65        Self::new(model, "ollama", base_url).with_caps(ModelCaps {
66            context_window: 32_768,
67            max_output: 8_000,
68            tools: true,
69            vision: false,
70            cost_input_per_mtok: 0.0,
71            cost_output_per_mtok: 0.0,
72            latency: LatencyClass::Medium,
73        })
74    }
75}
76
77fn build_chat_body(model: &str, req: &BrainRequest, echo_reasoning: bool) -> serde_json::Value {
78    let mut messages: Vec<serde_json::Value> = Vec::new();
79
80    // Add system message
81    if let Some(sys) = &req.system {
82        messages.push(json!({
83            "role": "system",
84            "content": sys,
85        }));
86    }
87
88    // Convert messages
89    for msg in &req.messages {
90        if msg.role == "system" {
91            messages.push(json!({
92                "role": "system",
93                "content": msg.content.iter()
94                    .filter_map(|b| match b {
95                        ContentBlock::Text { text } => Some(text.clone()),
96                        _ => None,
97                    })
98                    .collect::<Vec<_>>()
99                    .join("\n"),
100            }));
101            continue;
102        }
103
104        let mut content: Vec<serde_json::Value> = Vec::new();
105        let mut tool_calls: Vec<serde_json::Value> = Vec::new();
106        let mut reasoning_buf = String::new();
107        let mut emitted_tool_result = false;
108
109        for block in &msg.content {
110            match block {
111                ContentBlock::Text { text } => {
112                    content.push(json!({"type": "text", "text": text}));
113                }
114                ContentBlock::Image { source } => {
115                    content.push(json!({
116                        "type": "image_url",
117                        "image_url": {
118                            "url": image_source_url(source),
119                        }
120                    }));
121                }
122                ContentBlock::Reasoning { text } if echo_reasoning => {
123                    // DeepSeek / Moonshot / Qwen "thinking mode" require the
124                    // model's previous reasoning_content to be echoed back
125                    // on the next turn or the API rejects with 400. We aggregate
126                    // all reasoning blocks of this message and ship them as a
127                    // single `reasoning_content` field.
128                    if !reasoning_buf.is_empty() {
129                        reasoning_buf.push('\n');
130                    }
131                    reasoning_buf.push_str(text);
132                }
133                ContentBlock::Reasoning { .. } => {}
134                ContentBlock::ToolUse { id, name, input } => {
135                    tool_calls.push(json!({
136                        "id": id,
137                        "type": "function",
138                        "function": {
139                            "name": name,
140                            "arguments": serde_json::to_string(input).unwrap_or_default(),
141                        }
142                    }));
143                }
144                ContentBlock::ToolResult {
145                    tool_use_id,
146                    content: tool_content,
147                    ..
148                } => {
149                    let text = tool_content
150                        .iter()
151                        .filter_map(|b| match b {
152                            ContentBlock::Text { text } => Some(text.clone()),
153                            _ => None,
154                        })
155                        .collect::<Vec<_>>()
156                        .join("\n");
157                    messages.push(json!({
158                        "role": "tool",
159                        "tool_call_id": tool_use_id,
160                        "content": text,
161                    }));
162                    emitted_tool_result = true;
163                    continue; // tool results are separate messages
164                }
165            }
166        }
167
168        if emitted_tool_result && content.is_empty() && tool_calls.is_empty() {
169            continue;
170        }
171
172        let mut msg_json = json!({ "role": msg.role });
173
174        if !tool_calls.is_empty() {
175            msg_json["tool_calls"] = json!(tool_calls);
176        }
177        if !content.is_empty() {
178            if content.len() == 1 && content[0]["type"] == "text" {
179                msg_json["content"] = json!(content[0]["text"]);
180            } else {
181                msg_json["content"] = json!(content);
182            }
183        }
184        if !reasoning_buf.is_empty() && msg.role == "assistant" {
185            msg_json["reasoning_content"] = json!(reasoning_buf);
186        }
187
188        messages.push(msg_json);
189    }
190
191    // Build tools
192    let tools: Vec<serde_json::Value> = req
193        .tools
194        .iter()
195        .map(|t| {
196            json!({
197                "type": "function",
198                "function": {
199                    "name": t.name,
200                    "description": t.description,
201                    "parameters": t.input_schema,
202                }
203            })
204        })
205        .collect();
206
207    let mut body = json!({
208        "model": model,
209        "messages": messages,
210        "stream": true,
211        "stream_options": {
212            "include_usage": true
213        },
214        "temperature": req.temperature,
215    });
216
217    if req.max_tokens > 0 {
218        body["max_tokens"] = json!(req.max_tokens);
219    }
220    if !tools.is_empty() {
221        body["tools"] = json!(tools);
222    }
223    if !req.stop.is_empty() {
224        body["stop"] = json!(req.stop);
225    }
226    // NOTE: we deliberately never emit `prompt_cache_key` / `prompt_cache_retention`
227    // here. This one adapter fronts dozens of OpenAI-compatible endpoints and
228    // proxies (opencode-go, NVIDIA NIM, Groq, stepfun, Ollama, …). Many reject
229    // unknown parameters with HTTP 400 — e.g. opencode-go:
230    //   "Validation: Unsupported parameter(s): prompt_cache_retention,
231    //    prompt_cache_key"
232    // which made the first model in every routing chain fail and waste a turn.
233    // Prompt caching stays an Anthropic-only feature (handled in anthropic.rs,
234    // which keys off `req.cache.enabled` independently). The engine may still
235    // set `req.cache.enabled` for the run; this adapter simply ignores it.
236
237    body
238}
239
240fn image_source_url(source: &super::ImageSource) -> String {
241    match source {
242        super::ImageSource::Base64 { media_type, data } => {
243            format!("data:{};base64,{}", media_type, data)
244        }
245        super::ImageSource::Url { url } => url.clone(),
246    }
247}
248
249#[async_trait]
250impl Brain for OpenAICompatAdapter {
251    fn id(&self) -> &str {
252        &self.model
253    }
254
255    fn caps(&self) -> ModelCaps {
256        self.caps.clone()
257    }
258
259    async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
260        let body = build_chat_body(&self.model, &req, self.echo_reasoning);
261
262        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
263
264        let response = self
265            .client
266            .post(&url)
267            .header("Authorization", format!("Bearer {}", self.api_key))
268            .json(&body)
269            .send()
270            .await?;
271
272        if !response.status().is_success() {
273            let status = response.status().as_u16();
274            let body = response.text().await.unwrap_or_default();
275            return Err(anyhow::anyhow!(
276                "OpenAI-compatible API error {}: {}",
277                status,
278                body
279            ));
280        }
281
282        #[derive(Default)]
283        struct ToolCallState {
284            id: String,
285            started: bool,
286        }
287
288        let stream = response.bytes_stream();
289
290        // SSE state: tool-call accumulator + line buffer that survives chunk
291        // boundaries. Without the buffer, a JSON event split across two TCP
292        // chunks was parsed in halves and silently dropped — producing the
293        // "à rebours" → "àours" mangling.
294        struct SseState {
295            tools: HashMap<u64, ToolCallState>,
296            lines: super::sse_buffer::LineBuffer,
297            /// Accumulated assistant `content` text for this completion. Used
298            /// to recover tool calls a provider emitted as inline XML/DSML
299            /// markup inside `content` rather than as native `tool_calls`
300            /// (see provider::tool_markup).
301            content_buf: String,
302            /// True once we've decided the content is inline tool-call markup
303            /// and should be suppressed from the visible text stream.
304            suppress_text: bool,
305            /// Text held while the beginning of `content` is ambiguous: it may
306            /// still become inline tool-call markup once more chunks arrive.
307            pending_text: String,
308            /// B4: true once reasoning has been seen on the streaming `delta`
309            /// path. Providers also repeat the full reasoning under
310            /// `message.reasoning_content` on the final chunk; without this
311            /// flag the engine concatenated both and echoed doubled reasoning
312            /// back (context bloat + 400 risk). We take delta OR message,
313            /// never both.
314            reasoning_seen: bool,
315        }
316
317        let event_stream = stream
318            .scan(
319                SseState {
320                    tools: HashMap::new(),
321                    lines: super::sse_buffer::LineBuffer::new(),
322                    content_buf: String::new(),
323                    suppress_text: false,
324                    pending_text: String::new(),
325                    reasoning_seen: false,
326                },
327                |state, chunk| {
328                    let events: Vec<BrainEvent> = match chunk {
329                        Ok(bytes) => {
330                            let lines = state.lines.push(&bytes);
331                            let tool_state = &mut state.tools;
332                            let mut parsed = Vec::new();
333                            for line in lines {
334                                let line = line.trim();
335                                if line.is_empty() || !line.starts_with("data: ") {
336                                    continue;
337                                }
338                                let data = &line[6..];
339                                if data == "[DONE]" {
340                                    continue;
341                                }
342                                let event: serde_json::Value = match serde_json::from_str(data) {
343                                    Ok(v) => v,
344                                    Err(e) => {
345                                        tracing::debug!(
346                                            "JSON parse error: {} — data: {}",
347                                            e,
348                                            &data[..data.len().min(200)]
349                                        );
350                                        continue;
351                                    }
352                                };
353
354                                if let Some(choices) = event["choices"].as_array() {
355                                    for choice in choices {
356                                        if let Some(delta) = choice["delta"].as_object() {
357                                            if let Some(text) =
358                                                delta.get("content").and_then(|v| v.as_str())
359                                            {
360                                                if !text.is_empty() {
361                                                    state.content_buf.push_str(text);
362                                                    state.pending_text.push_str(text);
363                                                    // If this completion's content turns
364                                                    // out to be inline tool-call markup
365                                                    // (DeepSeek DSML / Anthropic-style
366                                                    // <invoke>), suppress it from the
367                                                    // visible text stream — it'll be
368                                                    // converted to real tool calls at
369                                                    // finish_reason.
370                                                    if !state.suppress_text
371                                                        && super::tool_markup::looks_like_tool_markup(
372                                                            &state.content_buf,
373                                                        )
374                                                    {
375                                                        state.suppress_text = true;
376                                                        state.pending_text.clear();
377                                                    }
378                                                    if !state.suppress_text
379                                                        && !super::tool_markup::could_be_tool_markup_prefix(
380                                                            &state.content_buf,
381                                                        )
382                                                        && !state.pending_text.is_empty()
383                                                    {
384                                                        parsed.push(BrainEvent::TextDelta(
385                                                            std::mem::take(&mut state.pending_text),
386                                                        ));
387                                                    }
388                                                }
389                                            }
390                                            // DeepSeek / Moonshot thinking-mode emit
391                                            // reasoning trace alongside content. Capture
392                                            // it as a dedicated event so the engine can
393                                            // echo it back on the next turn (required
394                                            // by DeepSeek's contract).
395                                            // Several providers report this under
396                                            // different keys; check the known aliases.
397                                            for key in [
398                                                "reasoning_content",
399                                                "reasoning",
400                                                "thinking",
401                                                "thought",
402                                            ] {
403                                                if let Some(rtext) =
404                                                    delta.get(key).and_then(|v| v.as_str())
405                                                {
406                                                    if !rtext.is_empty() {
407                                                        state.reasoning_seen = true;
408                                                        parsed.push(BrainEvent::ReasoningDelta(
409                                                            rtext.to_string(),
410                                                        ));
411                                                    }
412                                                }
413                                            }
414                                        }
415                                        // Some providers bundle the reasoning under
416                                        // `message.reasoning_content` on the final chunk
417                                        // rather than streaming it through `delta`. B4:
418                                        // only use it when nothing streamed via delta —
419                                        // otherwise it's the SAME trace repeated and
420                                        // concatenating both doubles it.
421                                        if !state.reasoning_seen {
422                                            if let Some(msg_obj) =
423                                                choice.get("message").and_then(|v| v.as_object())
424                                            {
425                                                for key in
426                                                    ["reasoning_content", "reasoning", "thinking"]
427                                                {
428                                                    if let Some(rtext) =
429                                                        msg_obj.get(key).and_then(|v| v.as_str())
430                                                    {
431                                                        if !rtext.is_empty() {
432                                                            state.reasoning_seen = true;
433                                                            parsed.push(BrainEvent::ReasoningDelta(
434                                                                rtext.to_string(),
435                                                            ));
436                                                        }
437                                                    }
438                                                }
439                                            }
440                                        }
441                                        if let Some(delta) = choice["delta"].as_object() {
442                                            // (Re-open the original tool_calls block.)
443                                            let _ = delta; // keep this branch syntactically anchored
444                                            if let Some(tool_calls) =
445                                                delta.get("tool_calls").and_then(|v| v.as_array())
446                                            {
447                                                for tc in tool_calls {
448                                                    let idx = tc
449                                                        .get("index")
450                                                        .and_then(|v| v.as_u64())
451                                                        .unwrap_or(0);
452                                                    let id = tc
453                                                        .get("id")
454                                                        .and_then(|v| v.as_str())
455                                                        .map(|s| s.to_string());
456                                                    let state = tool_state.entry(idx).or_default();
457                                                    if let Some(id) = id {
458                                                        state.id = id;
459                                                    }
460                                                    if let Some(func) = tc
461                                                        .get("function")
462                                                        .and_then(|v| v.as_object())
463                                                    {
464                                                        if let Some(name) = func
465                                                            .get("name")
466                                                            .and_then(|v| v.as_str())
467                                                        {
468                                                            if !state.started {
469                                                                if state.id.is_empty() {
470                                                                    // B8: unique even when
471                                                                    // the provider omits the
472                                                                    // id, across turns.
473                                                                    state.id =
474                                                                        next_synth_id("tool");
475                                                                }
476                                                                state.started = true;
477                                                                parsed.push(
478                                                                    BrainEvent::ToolUseStart {
479                                                                        id: state.id.clone(),
480                                                                        name: name.to_string(),
481                                                                    },
482                                                                );
483                                                            }
484                                                        }
485                                                        if let Some(args) = func
486                                                            .get("arguments")
487                                                            .and_then(|v| v.as_str())
488                                                        {
489                                                            if !state.id.is_empty()
490                                                                && !args.is_empty()
491                                                            {
492                                                                parsed.push(
493                                                                    BrainEvent::ToolUseDelta {
494                                                                        id: state.id.clone(),
495                                                                        json: args.to_string(),
496                                                                    },
497                                                                );
498                                                            }
499                                                        }
500                                                    }
501                                                }
502                                            }
503                                        }
504
505                                        if let Some(reason) =
506                                            choice.get("finish_reason").and_then(|v| v.as_str())
507                                        {
508                                            if !reason.is_empty() && reason != "null" {
509                                                let stop = match reason {
510                                                    "stop" => {
511                                                        // A2: a provider may stream native
512                                                        // tool_calls and then finish with
513                                                        // "stop" (not "tool_calls"). Drain
514                                                        // any pending native calls FIRST so
515                                                        // they actually execute instead of
516                                                        // being silently dropped.
517                                                        let mut native = false;
518                                                        for idx in sorted_indices(
519                                                            tool_state.keys().copied(),
520                                                        ) {
521                                                            if let Some(st) =
522                                                                tool_state.remove(&idx)
523                                                            {
524                                                                if !st.id.is_empty() {
525                                                                    parsed.push(
526                                                                        BrainEvent::ToolUseEnd {
527                                                                            id: st.id,
528                                                                        },
529                                                                    );
530                                                                    native = true;
531                                                                }
532                                                            }
533                                                        }
534                                                        // Otherwise recover tool calls a
535                                                        // provider emitted as inline
536                                                        // XML/DSML markup in `content` (with
537                                                        // finish_reason "stop") instead of
538                                                        // native tool_calls — without this
539                                                        // the call leaks as raw text and
540                                                        // never runs.
541                                                        let calls = if !native
542                                                            && super::tool_markup::looks_like_tool_markup(
543                                                                &state.content_buf,
544                                                            )
545                                                        {
546                                                            super::tool_markup::extract_tool_calls(
547                                                                &state.content_buf,
548                                                            )
549                                                        } else {
550                                                            Vec::new()
551                                                        };
552                                                        if native {
553                                                            sparrow_core::event::StopReason::ToolUse
554                                                        } else if calls.is_empty() {
555                                                            if !state.suppress_text
556                                                                && !state.pending_text.is_empty()
557                                                            {
558                                                                parsed.push(
559                                                                    BrainEvent::TextDelta(
560                                                                        std::mem::take(
561                                                                            &mut state.pending_text,
562                                                                        ),
563                                                                    ),
564                                                                );
565                                                            }
566                                                            sparrow_core::event::StopReason::EndTurn
567                                                        } else {
568                                                            for call in calls.into_iter() {
569                                                                // B8: unique id per
570                                                                // synthesized call so two
571                                                                // markup turns in one run
572                                                                // never collide.
573                                                                let id = next_synth_id("markup");
574                                                                parsed.push(
575                                                                    BrainEvent::ToolUseStart {
576                                                                        id: id.clone(),
577                                                                        name: call.name,
578                                                                    },
579                                                                );
580                                                                parsed.push(
581                                                                    BrainEvent::ToolUseDelta {
582                                                                        id: id.clone(),
583                                                                        json: call
584                                                                            .args
585                                                                            .to_string(),
586                                                                    },
587                                                                );
588                                                                parsed.push(
589                                                                    BrainEvent::ToolUseEnd { id },
590                                                                );
591                                                            }
592                                                            sparrow_core::event::StopReason::ToolUse
593                                                        }
594                                                    }
595                                                    "length" => sparrow_core::event::StopReason::MaxTokens,
596                                                    "tool_calls" => {
597                                                        // A1/A2: emit Ends in index order,
598                                                        // not HashMap-arbitrary order.
599                                                        for idx in sorted_indices(
600                                                            tool_state.keys().copied(),
601                                                        ) {
602                                                            if let Some(st) =
603                                                                tool_state.remove(&idx)
604                                                            {
605                                                                if !st.id.is_empty() {
606                                                                    parsed.push(
607                                                                        BrainEvent::ToolUseEnd {
608                                                                            id: st.id,
609                                                                        },
610                                                                    );
611                                                                }
612                                                            }
613                                                        }
614                                                        sparrow_core::event::StopReason::ToolUse
615                                                    }
616                                                    s => sparrow_core::event::StopReason::StopSequence(
617                                                        s.to_string(),
618                                                    ),
619                                                };
620                                                parsed.push(BrainEvent::Done(stop));
621                                            }
622                                        }
623                                    }
624                                }
625
626                                if let Some(usage) = event.get("usage").and_then(|u| u.as_object())
627                                {
628                                    // Use .get() — indexing a serde_json::Map with [] panics on a
629                                    // missing key, and some providers (e.g. MiniMax) omit fields.
630                                    parsed.push(BrainEvent::Usage(sparrow_core::event::TokenUsage {
631                                        input: usage
632                                            .get("prompt_tokens")
633                                            .and_then(|v| v.as_u64())
634                                            .unwrap_or(0),
635                                        output: usage
636                                            .get("completion_tokens")
637                                            .and_then(|v| v.as_u64())
638                                            .unwrap_or(0),
639                                    }));
640                                }
641                            }
642                            parsed
643                        }
644                        Err(e) => vec![BrainEvent::Error(format!("stream error: {}", e))],
645                    };
646                    futures::future::ready(Some(stream::iter(events)))
647                },
648            )
649            .flatten();
650
651        Ok(Box::pin(event_stream))
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658    use crate::{Msg, PromptCacheConfig, PromptCacheTtl};
659    use futures::StreamExt;
660    use tokio::io::{AsyncReadExt, AsyncWriteExt};
661    use tokio::net::TcpListener;
662
663    #[test]
664    fn openai_chat_body_never_sends_prompt_cache_params() {
665        // Regression for the v0.8.x 400: many OpenAI-compatible proxies
666        // (opencode-go, …) reject `prompt_cache_key`/`prompt_cache_retention`
667        // with HTTP 400, which made the first routed model fail every run.
668        // These params are Anthropic-only (see anthropic.rs); this adapter
669        // must never emit them — even when the run enabled caching.
670        let req = BrainRequest {
671            system: Some("stable sparrow system".into()),
672            messages: vec![Msg {
673                role: "user".into(),
674                content: vec![ContentBlock::Text {
675                    text: "dynamic task".into(),
676                }],
677            }],
678            cache: PromptCacheConfig {
679                enabled: true,
680                ttl: PromptCacheTtl::OneHour,
681                key: Some("sparrow-repo-abc".into()),
682            },
683            ..BrainRequest::default()
684        };
685
686        let body = build_chat_body("gpt-test", &req, true);
687        assert!(
688            body.get("prompt_cache_key").is_none(),
689            "prompt_cache_key must never be sent to an OpenAI-compatible endpoint"
690        );
691        assert!(
692            body.get("prompt_cache_retention").is_none(),
693            "prompt_cache_retention must never be sent to an OpenAI-compatible endpoint"
694        );
695    }
696
697    #[test]
698    fn openai_chat_body_serializes_image_blocks() {
699        let req = BrainRequest {
700            messages: vec![Msg {
701                role: "user".into(),
702                content: vec![
703                    ContentBlock::Text {
704                        text: "what is in this image?".into(),
705                    },
706                    ContentBlock::Image {
707                        source: crate::ImageSource::Base64 {
708                            media_type: "image/png".into(),
709                            data: "iVBORw0KGgo=".into(),
710                        },
711                    },
712                ],
713            }],
714            ..BrainRequest::default()
715        };
716
717        let body = build_chat_body("gpt-test", &req, true);
718        assert_eq!(body["messages"][0]["content"][0]["type"], "text");
719        assert_eq!(body["messages"][0]["content"][1]["type"], "image_url");
720        assert_eq!(
721            body["messages"][0]["content"][1]["image_url"]["url"],
722            "data:image/png;base64,iVBORw0KGgo="
723        );
724    }
725
726    #[test]
727    fn openai_chat_body_reinjects_assistant_reasoning_content() {
728        let req = BrainRequest {
729            messages: vec![Msg {
730                role: "assistant".into(),
731                content: vec![
732                    ContentBlock::Reasoning {
733                        text: "opaque provider reasoning".into(),
734                    },
735                    ContentBlock::Text {
736                        text: "visible answer".into(),
737                    },
738                ],
739            }],
740            ..BrainRequest::default()
741        };
742
743        let body = build_chat_body("deepseek-test", &req, true);
744        assert_eq!(body["messages"][0]["content"], "visible answer");
745        assert_eq!(
746            body["messages"][0]["reasoning_content"],
747            "opaque provider reasoning"
748        );
749    }
750
751    #[test]
752    fn openai_chat_body_can_disable_reasoning_echo() {
753        let req = BrainRequest {
754            messages: vec![Msg {
755                role: "assistant".into(),
756                content: vec![
757                    ContentBlock::Reasoning {
758                        text: "provider-private reasoning".into(),
759                    },
760                    ContentBlock::Text {
761                        text: "visible answer".into(),
762                    },
763                ],
764            }],
765            ..BrainRequest::default()
766        };
767
768        let body = build_chat_body("provider-no-echo", &req, false);
769        assert_eq!(body["messages"][0]["content"], "visible answer");
770        assert!(
771            body["messages"][0].get("reasoning_content").is_none(),
772            "provider flagged echo_reasoning=false must not receive reasoning_content"
773        );
774    }
775
776    #[test]
777    fn multi_tool_turn_is_one_assistant_message_with_reasoning() {
778        // Regression for the v0.5.5 fix: a single model turn that emits N tool
779        // calls must serialize as ONE assistant message carrying
780        // reasoning_content + a tool_calls array of length N. Splitting it into
781        // one message per tool dropped reasoning_content from the 2nd+ calls,
782        // which DeepSeek/Qwen/Moonshot thinking-mode rejects with HTTP 400 and
783        // which aborted multi-file tasks half-way.
784        let req = BrainRequest {
785            messages: vec![Msg {
786                role: "assistant".into(),
787                content: vec![
788                    ContentBlock::Reasoning {
789                        text: "thinking about two files".into(),
790                    },
791                    ContentBlock::ToolUse {
792                        id: "call_0".into(),
793                        name: "fs_write".into(),
794                        input: serde_json::json!({"path": "reverse.py"}),
795                    },
796                    ContentBlock::ToolUse {
797                        id: "call_1".into(),
798                        name: "fs_write".into(),
799                        input: serde_json::json!({"path": "test_reverse.py"}),
800                    },
801                ],
802            }],
803            ..BrainRequest::default()
804        };
805
806        let body = build_chat_body("deepseek-test", &req, true);
807        // exactly one assistant message
808        assert_eq!(body["messages"].as_array().unwrap().len(), 1);
809        // reasoning_content present on it
810        assert_eq!(
811            body["messages"][0]["reasoning_content"],
812            "thinking about two files"
813        );
814        // both tool calls in a single tool_calls array
815        let calls = body["messages"][0]["tool_calls"].as_array().unwrap();
816        assert_eq!(calls.len(), 2);
817        assert_eq!(calls[0]["id"], "call_0");
818        assert_eq!(calls[1]["id"], "call_1");
819        assert_eq!(calls[0]["function"]["name"], "fs_write");
820    }
821
822    #[tokio::test]
823    async fn b1_partial_markup_stream_never_emits_visible_text() {
824        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
825        let addr = listener.local_addr().unwrap();
826        let server = tokio::spawn(async move {
827            let (mut socket, _) = listener.accept().await.unwrap();
828            let mut buf = [0_u8; 4096];
829            let _ = socket.read(&mut buf).await.unwrap();
830            let chunks = [
831                "<",
832                "||DSML||invoke name=\"read\">",
833                "<||DSML||parameter name=\"file_path\" string=\"true\">",
834                "config.py",
835                "</||DSML||parameter>",
836                "</||DSML||invoke>",
837            ];
838            let mut body = String::new();
839            for chunk in chunks {
840                body.push_str("data: ");
841                body.push_str(
842                    &serde_json::json!({
843                        "choices": [{
844                            "delta": {"content": chunk},
845                            "finish_reason": null
846                        }]
847                    })
848                    .to_string(),
849                );
850                body.push_str("\n\n");
851            }
852            body.push_str("data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n");
853            let response = format!(
854                "HTTP/1.1 200 OK\r\ncontent-type: text/event-stream\r\ncontent-length: {}\r\n\r\n{}",
855                body.len(),
856                body
857            );
858            socket.write_all(response.as_bytes()).await.unwrap();
859        });
860
861        let adapter =
862            OpenAICompatAdapter::new("deepseek-test", "test-key", &format!("http://{}", addr));
863        let mut stream = adapter.complete(BrainRequest::default()).await.unwrap();
864
865        let mut text = String::new();
866        let mut tool_name = None;
867        let mut tool_args = String::new();
868        let mut done = None;
869        while let Some(event) = stream.next().await {
870            match event {
871                BrainEvent::TextDelta(delta) => text.push_str(&delta),
872                BrainEvent::ToolUseStart { name, .. } => tool_name = Some(name),
873                BrainEvent::ToolUseDelta { json, .. } => tool_args.push_str(&json),
874                BrainEvent::ToolUseEnd { .. } => {}
875                BrainEvent::Done(reason) => done = Some(reason),
876                other => panic!("unexpected event: {other:?}"),
877            }
878        }
879        server.await.unwrap();
880
881        assert_eq!(
882            text, "",
883            "partial inline markup must not leak as visible text"
884        );
885        assert_eq!(tool_name.as_deref(), Some("read"));
886        let args: serde_json::Value = serde_json::from_str(&tool_args).unwrap();
887        assert_eq!(args["file_path"], "config.py");
888        assert!(matches!(done, Some(sparrow_core::event::StopReason::ToolUse)));
889    }
890}