Skip to main content

sparrow/provider/
openai_compat.rs

1use async_trait::async_trait;
2use futures::stream::{self, StreamExt};
3use reqwest::Client;
4use serde_json::json;
5use std::collections::HashMap;
6
7use super::{Brain, BrainEvent, BrainRequest, BrainStream, ContentBlock, LatencyClass, ModelCaps};
8
9/// Process-monotonic counter for synthesized tool-call ids (B8): markup-derived
10/// and id-less native calls get a unique id so two turns in one run can't
11/// collide on `markup-call-0` and confuse id-keyed approval/replay state.
12static SYNTH_TOOL_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
13
14fn next_synth_id(kind: &str) -> String {
15    let n = SYNTH_TOOL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
16    format!("{kind}-call-{n}")
17}
18
19/// Sorted indices of a tool-call accumulator, ascending. Used to emit
20/// `ToolUseEnd` in the order the model declared the calls (index order), not
21/// the arbitrary order a `HashMap` drains in (A1/A2).
22fn sorted_indices(keys: impl Iterator<Item = u64>) -> Vec<u64> {
23    let mut idxs: Vec<u64> = keys.collect();
24    idxs.sort_unstable();
25    idxs
26}
27
28/// OpenAI-compatible adapter. Covers OpenAI, Groq, NVIDIA NIM, Together, Cerebras,
29/// OpenRouter, NovitaAI, Nous Portal, HuggingFace, Ollama, and custom endpoints.
30pub struct OpenAICompatAdapter {
31    model: String,
32    api_key: String,
33    base_url: String,
34    client: Client,
35    caps: ModelCaps,
36    echo_reasoning: bool,
37}
38
39impl OpenAICompatAdapter {
40    pub fn new(model: &str, api_key: impl Into<String>, base_url: &str) -> Self {
41        let model = model.to_string();
42        Self {
43            model,
44            api_key: api_key.into(),
45            base_url: base_url.to_string(),
46            client: Client::new(),
47            caps: ModelCaps::default(),
48            echo_reasoning: true,
49        }
50    }
51
52    pub fn with_caps(mut self, caps: ModelCaps) -> Self {
53        self.caps = caps;
54        self
55    }
56
57    pub fn with_echo_reasoning(mut self, echo_reasoning: bool) -> Self {
58        self.echo_reasoning = echo_reasoning;
59        self
60    }
61
62    /// Create an Ollama adapter (OpenAI-compatible API on localhost)
63    pub fn ollama(model: &str, base_url: &str) -> Self {
64        // Ollama doesn't require an API key
65        Self::new(model, "ollama", base_url).with_caps(ModelCaps {
66            context_window: 32_768,
67            max_output: 8_000,
68            tools: true,
69            vision: false,
70            cost_input_per_mtok: 0.0,
71            cost_output_per_mtok: 0.0,
72            latency: LatencyClass::Medium,
73        })
74    }
75}
76
77fn build_chat_body(model: &str, req: &BrainRequest, echo_reasoning: bool) -> serde_json::Value {
78    let mut messages: Vec<serde_json::Value> = Vec::new();
79
80    // Add system message
81    if let Some(sys) = &req.system {
82        messages.push(json!({
83            "role": "system",
84            "content": sys,
85        }));
86    }
87
88    // Convert messages
89    for msg in &req.messages {
90        if msg.role == "system" {
91            messages.push(json!({
92                "role": "system",
93                "content": msg.content.iter()
94                    .filter_map(|b| match b {
95                        ContentBlock::Text { text } => Some(text.clone()),
96                        _ => None,
97                    })
98                    .collect::<Vec<_>>()
99                    .join("\n"),
100            }));
101            continue;
102        }
103
104        let mut content: Vec<serde_json::Value> = Vec::new();
105        let mut tool_calls: Vec<serde_json::Value> = Vec::new();
106        let mut reasoning_buf = String::new();
107        let mut emitted_tool_result = false;
108
109        for block in &msg.content {
110            match block {
111                ContentBlock::Text { text } => {
112                    content.push(json!({"type": "text", "text": text}));
113                }
114                ContentBlock::Image { source } => {
115                    content.push(json!({
116                        "type": "image_url",
117                        "image_url": {
118                            "url": image_source_url(source),
119                        }
120                    }));
121                }
122                ContentBlock::Reasoning { text } if echo_reasoning => {
123                    // DeepSeek / Moonshot / Qwen "thinking mode" require the
124                    // model's previous reasoning_content to be echoed back
125                    // on the next turn or the API rejects with 400. We aggregate
126                    // all reasoning blocks of this message and ship them as a
127                    // single `reasoning_content` field.
128                    if !reasoning_buf.is_empty() {
129                        reasoning_buf.push('\n');
130                    }
131                    reasoning_buf.push_str(text);
132                }
133                ContentBlock::Reasoning { .. } => {}
134                ContentBlock::ToolUse { id, name, input } => {
135                    tool_calls.push(json!({
136                        "id": id,
137                        "type": "function",
138                        "function": {
139                            "name": name,
140                            "arguments": serde_json::to_string(input).unwrap_or_default(),
141                        }
142                    }));
143                }
144                ContentBlock::ToolResult {
145                    tool_use_id,
146                    content: tool_content,
147                    ..
148                } => {
149                    let text = tool_content
150                        .iter()
151                        .filter_map(|b| match b {
152                            ContentBlock::Text { text } => Some(text.clone()),
153                            _ => None,
154                        })
155                        .collect::<Vec<_>>()
156                        .join("\n");
157                    messages.push(json!({
158                        "role": "tool",
159                        "tool_call_id": tool_use_id,
160                        "content": text,
161                    }));
162                    emitted_tool_result = true;
163                    continue; // tool results are separate messages
164                }
165            }
166        }
167
168        if emitted_tool_result && content.is_empty() && tool_calls.is_empty() {
169            continue;
170        }
171
172        let mut msg_json = json!({ "role": msg.role });
173
174        if !tool_calls.is_empty() {
175            msg_json["tool_calls"] = json!(tool_calls);
176        }
177        if !content.is_empty() {
178            if content.len() == 1 && content[0]["type"] == "text" {
179                msg_json["content"] = json!(content[0]["text"]);
180            } else {
181                msg_json["content"] = json!(content);
182            }
183        }
184        if !reasoning_buf.is_empty() && msg.role == "assistant" {
185            msg_json["reasoning_content"] = json!(reasoning_buf);
186        }
187
188        messages.push(msg_json);
189    }
190
191    // Build tools
192    let tools: Vec<serde_json::Value> = req
193        .tools
194        .iter()
195        .map(|t| {
196            json!({
197                "type": "function",
198                "function": {
199                    "name": t.name,
200                    "description": t.description,
201                    "parameters": t.input_schema,
202                }
203            })
204        })
205        .collect();
206
207    let mut body = json!({
208        "model": model,
209        "messages": messages,
210        "stream": true,
211        "stream_options": {
212            "include_usage": true
213        },
214        "temperature": req.temperature,
215    });
216
217    if req.max_tokens > 0 {
218        body["max_tokens"] = json!(req.max_tokens);
219    }
220    if !tools.is_empty() {
221        body["tools"] = json!(tools);
222    }
223    if !req.stop.is_empty() {
224        body["stop"] = json!(req.stop);
225    }
226    if req.cache.enabled {
227        if let Some(key) = &req.cache.key {
228            body["prompt_cache_key"] = json!(key);
229        }
230        body["prompt_cache_retention"] = json!(req.cache.ttl.openai_retention());
231    }
232
233    body
234}
235
236fn image_source_url(source: &super::ImageSource) -> String {
237    match source {
238        super::ImageSource::Base64 { media_type, data } => {
239            format!("data:{};base64,{}", media_type, data)
240        }
241        super::ImageSource::Url { url } => url.clone(),
242    }
243}
244
245#[async_trait]
246impl Brain for OpenAICompatAdapter {
247    fn id(&self) -> &str {
248        &self.model
249    }
250
251    fn caps(&self) -> ModelCaps {
252        self.caps.clone()
253    }
254
255    async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
256        let body = build_chat_body(&self.model, &req, self.echo_reasoning);
257
258        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
259
260        let response = self
261            .client
262            .post(&url)
263            .header("Authorization", format!("Bearer {}", self.api_key))
264            .json(&body)
265            .send()
266            .await?;
267
268        if !response.status().is_success() {
269            let status = response.status().as_u16();
270            let body = response.text().await.unwrap_or_default();
271            return Err(anyhow::anyhow!(
272                "OpenAI-compatible API error {}: {}",
273                status,
274                body
275            ));
276        }
277
278        #[derive(Default)]
279        struct ToolCallState {
280            id: String,
281            started: bool,
282        }
283
284        let stream = response.bytes_stream();
285
286        // SSE state: tool-call accumulator + line buffer that survives chunk
287        // boundaries. Without the buffer, a JSON event split across two TCP
288        // chunks was parsed in halves and silently dropped — producing the
289        // "à rebours" → "àours" mangling.
290        struct SseState {
291            tools: HashMap<u64, ToolCallState>,
292            lines: super::sse_buffer::LineBuffer,
293            /// Accumulated assistant `content` text for this completion. Used
294            /// to recover tool calls a provider emitted as inline XML/DSML
295            /// markup inside `content` rather than as native `tool_calls`
296            /// (see provider::tool_markup).
297            content_buf: String,
298            /// True once we've decided the content is inline tool-call markup
299            /// and should be suppressed from the visible text stream.
300            suppress_text: bool,
301            /// Text held while the beginning of `content` is ambiguous: it may
302            /// still become inline tool-call markup once more chunks arrive.
303            pending_text: String,
304            /// B4: true once reasoning has been seen on the streaming `delta`
305            /// path. Providers also repeat the full reasoning under
306            /// `message.reasoning_content` on the final chunk; without this
307            /// flag the engine concatenated both and echoed doubled reasoning
308            /// back (context bloat + 400 risk). We take delta OR message,
309            /// never both.
310            reasoning_seen: bool,
311        }
312
313        let event_stream = stream
314            .scan(
315                SseState {
316                    tools: HashMap::new(),
317                    lines: super::sse_buffer::LineBuffer::new(),
318                    content_buf: String::new(),
319                    suppress_text: false,
320                    pending_text: String::new(),
321                    reasoning_seen: false,
322                },
323                |state, chunk| {
324                    let events: Vec<BrainEvent> = match chunk {
325                        Ok(bytes) => {
326                            let lines = state.lines.push(&bytes);
327                            let tool_state = &mut state.tools;
328                            let mut parsed = Vec::new();
329                            for line in lines {
330                                let line = line.trim();
331                                if line.is_empty() || !line.starts_with("data: ") {
332                                    continue;
333                                }
334                                let data = &line[6..];
335                                if data == "[DONE]" {
336                                    continue;
337                                }
338                                let event: serde_json::Value = match serde_json::from_str(data) {
339                                    Ok(v) => v,
340                                    Err(e) => {
341                                        tracing::debug!(
342                                            "JSON parse error: {} — data: {}",
343                                            e,
344                                            &data[..data.len().min(200)]
345                                        );
346                                        continue;
347                                    }
348                                };
349
350                                if let Some(choices) = event["choices"].as_array() {
351                                    for choice in choices {
352                                        if let Some(delta) = choice["delta"].as_object() {
353                                            if let Some(text) =
354                                                delta.get("content").and_then(|v| v.as_str())
355                                            {
356                                                if !text.is_empty() {
357                                                    state.content_buf.push_str(text);
358                                                    state.pending_text.push_str(text);
359                                                    // If this completion's content turns
360                                                    // out to be inline tool-call markup
361                                                    // (DeepSeek DSML / Anthropic-style
362                                                    // <invoke>), suppress it from the
363                                                    // visible text stream — it'll be
364                                                    // converted to real tool calls at
365                                                    // finish_reason.
366                                                    if !state.suppress_text
367                                                        && super::tool_markup::looks_like_tool_markup(
368                                                            &state.content_buf,
369                                                        )
370                                                    {
371                                                        state.suppress_text = true;
372                                                        state.pending_text.clear();
373                                                    }
374                                                    if !state.suppress_text
375                                                        && !super::tool_markup::could_be_tool_markup_prefix(
376                                                            &state.content_buf,
377                                                        )
378                                                        && !state.pending_text.is_empty()
379                                                    {
380                                                        parsed.push(BrainEvent::TextDelta(
381                                                            std::mem::take(&mut state.pending_text),
382                                                        ));
383                                                    }
384                                                }
385                                            }
386                                            // DeepSeek / Moonshot thinking-mode emit
387                                            // reasoning trace alongside content. Capture
388                                            // it as a dedicated event so the engine can
389                                            // echo it back on the next turn (required
390                                            // by DeepSeek's contract).
391                                            // Several providers report this under
392                                            // different keys; check the known aliases.
393                                            for key in [
394                                                "reasoning_content",
395                                                "reasoning",
396                                                "thinking",
397                                                "thought",
398                                            ] {
399                                                if let Some(rtext) =
400                                                    delta.get(key).and_then(|v| v.as_str())
401                                                {
402                                                    if !rtext.is_empty() {
403                                                        state.reasoning_seen = true;
404                                                        parsed.push(BrainEvent::ReasoningDelta(
405                                                            rtext.to_string(),
406                                                        ));
407                                                    }
408                                                }
409                                            }
410                                        }
411                                        // Some providers bundle the reasoning under
412                                        // `message.reasoning_content` on the final chunk
413                                        // rather than streaming it through `delta`. B4:
414                                        // only use it when nothing streamed via delta —
415                                        // otherwise it's the SAME trace repeated and
416                                        // concatenating both doubles it.
417                                        if !state.reasoning_seen {
418                                            if let Some(msg_obj) =
419                                                choice.get("message").and_then(|v| v.as_object())
420                                            {
421                                                for key in
422                                                    ["reasoning_content", "reasoning", "thinking"]
423                                                {
424                                                    if let Some(rtext) =
425                                                        msg_obj.get(key).and_then(|v| v.as_str())
426                                                    {
427                                                        if !rtext.is_empty() {
428                                                            state.reasoning_seen = true;
429                                                            parsed.push(BrainEvent::ReasoningDelta(
430                                                                rtext.to_string(),
431                                                            ));
432                                                        }
433                                                    }
434                                                }
435                                            }
436                                        }
437                                        if let Some(delta) = choice["delta"].as_object() {
438                                            // (Re-open the original tool_calls block.)
439                                            let _ = delta; // keep this branch syntactically anchored
440                                            if let Some(tool_calls) =
441                                                delta.get("tool_calls").and_then(|v| v.as_array())
442                                            {
443                                                for tc in tool_calls {
444                                                    let idx = tc
445                                                        .get("index")
446                                                        .and_then(|v| v.as_u64())
447                                                        .unwrap_or(0);
448                                                    let id = tc
449                                                        .get("id")
450                                                        .and_then(|v| v.as_str())
451                                                        .map(|s| s.to_string());
452                                                    let state = tool_state.entry(idx).or_default();
453                                                    if let Some(id) = id {
454                                                        state.id = id;
455                                                    }
456                                                    if let Some(func) = tc
457                                                        .get("function")
458                                                        .and_then(|v| v.as_object())
459                                                    {
460                                                        if let Some(name) = func
461                                                            .get("name")
462                                                            .and_then(|v| v.as_str())
463                                                        {
464                                                            if !state.started {
465                                                                if state.id.is_empty() {
466                                                                    // B8: unique even when
467                                                                    // the provider omits the
468                                                                    // id, across turns.
469                                                                    state.id =
470                                                                        next_synth_id("tool");
471                                                                }
472                                                                state.started = true;
473                                                                parsed.push(
474                                                                    BrainEvent::ToolUseStart {
475                                                                        id: state.id.clone(),
476                                                                        name: name.to_string(),
477                                                                    },
478                                                                );
479                                                            }
480                                                        }
481                                                        if let Some(args) = func
482                                                            .get("arguments")
483                                                            .and_then(|v| v.as_str())
484                                                        {
485                                                            if !state.id.is_empty()
486                                                                && !args.is_empty()
487                                                            {
488                                                                parsed.push(
489                                                                    BrainEvent::ToolUseDelta {
490                                                                        id: state.id.clone(),
491                                                                        json: args.to_string(),
492                                                                    },
493                                                                );
494                                                            }
495                                                        }
496                                                    }
497                                                }
498                                            }
499                                        }
500
501                                        if let Some(reason) =
502                                            choice.get("finish_reason").and_then(|v| v.as_str())
503                                        {
504                                            if !reason.is_empty() && reason != "null" {
505                                                let stop = match reason {
506                                                    "stop" => {
507                                                        // A2: a provider may stream native
508                                                        // tool_calls and then finish with
509                                                        // "stop" (not "tool_calls"). Drain
510                                                        // any pending native calls FIRST so
511                                                        // they actually execute instead of
512                                                        // being silently dropped.
513                                                        let mut native = false;
514                                                        for idx in sorted_indices(
515                                                            tool_state.keys().copied(),
516                                                        ) {
517                                                            if let Some(st) =
518                                                                tool_state.remove(&idx)
519                                                            {
520                                                                if !st.id.is_empty() {
521                                                                    parsed.push(
522                                                                        BrainEvent::ToolUseEnd {
523                                                                            id: st.id,
524                                                                        },
525                                                                    );
526                                                                    native = true;
527                                                                }
528                                                            }
529                                                        }
530                                                        // Otherwise recover tool calls a
531                                                        // provider emitted as inline
532                                                        // XML/DSML markup in `content` (with
533                                                        // finish_reason "stop") instead of
534                                                        // native tool_calls — without this
535                                                        // the call leaks as raw text and
536                                                        // never runs.
537                                                        let calls = if !native
538                                                            && super::tool_markup::looks_like_tool_markup(
539                                                                &state.content_buf,
540                                                            )
541                                                        {
542                                                            super::tool_markup::extract_tool_calls(
543                                                                &state.content_buf,
544                                                            )
545                                                        } else {
546                                                            Vec::new()
547                                                        };
548                                                        if native {
549                                                            crate::event::StopReason::ToolUse
550                                                        } else if calls.is_empty() {
551                                                            if !state.suppress_text
552                                                                && !state.pending_text.is_empty()
553                                                            {
554                                                                parsed.push(
555                                                                    BrainEvent::TextDelta(
556                                                                        std::mem::take(
557                                                                            &mut state.pending_text,
558                                                                        ),
559                                                                    ),
560                                                                );
561                                                            }
562                                                            crate::event::StopReason::EndTurn
563                                                        } else {
564                                                            for call in calls.into_iter() {
565                                                                // B8: unique id per
566                                                                // synthesized call so two
567                                                                // markup turns in one run
568                                                                // never collide.
569                                                                let id = next_synth_id("markup");
570                                                                parsed.push(
571                                                                    BrainEvent::ToolUseStart {
572                                                                        id: id.clone(),
573                                                                        name: call.name,
574                                                                    },
575                                                                );
576                                                                parsed.push(
577                                                                    BrainEvent::ToolUseDelta {
578                                                                        id: id.clone(),
579                                                                        json: call
580                                                                            .args
581                                                                            .to_string(),
582                                                                    },
583                                                                );
584                                                                parsed.push(
585                                                                    BrainEvent::ToolUseEnd { id },
586                                                                );
587                                                            }
588                                                            crate::event::StopReason::ToolUse
589                                                        }
590                                                    }
591                                                    "length" => crate::event::StopReason::MaxTokens,
592                                                    "tool_calls" => {
593                                                        // A1/A2: emit Ends in index order,
594                                                        // not HashMap-arbitrary order.
595                                                        for idx in sorted_indices(
596                                                            tool_state.keys().copied(),
597                                                        ) {
598                                                            if let Some(st) =
599                                                                tool_state.remove(&idx)
600                                                            {
601                                                                if !st.id.is_empty() {
602                                                                    parsed.push(
603                                                                        BrainEvent::ToolUseEnd {
604                                                                            id: st.id,
605                                                                        },
606                                                                    );
607                                                                }
608                                                            }
609                                                        }
610                                                        crate::event::StopReason::ToolUse
611                                                    }
612                                                    s => crate::event::StopReason::StopSequence(
613                                                        s.to_string(),
614                                                    ),
615                                                };
616                                                parsed.push(BrainEvent::Done(stop));
617                                            }
618                                        }
619                                    }
620                                }
621
622                                if let Some(usage) = event.get("usage").and_then(|u| u.as_object())
623                                {
624                                    // Use .get() — indexing a serde_json::Map with [] panics on a
625                                    // missing key, and some providers (e.g. MiniMax) omit fields.
626                                    parsed.push(BrainEvent::Usage(crate::event::TokenUsage {
627                                        input: usage
628                                            .get("prompt_tokens")
629                                            .and_then(|v| v.as_u64())
630                                            .unwrap_or(0),
631                                        output: usage
632                                            .get("completion_tokens")
633                                            .and_then(|v| v.as_u64())
634                                            .unwrap_or(0),
635                                    }));
636                                }
637                            }
638                            parsed
639                        }
640                        Err(e) => vec![BrainEvent::Error(format!("stream error: {}", e))],
641                    };
642                    futures::future::ready(Some(stream::iter(events)))
643                },
644            )
645            .flatten();
646
647        Ok(Box::pin(event_stream))
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654    use crate::provider::{Msg, PromptCacheConfig, PromptCacheTtl};
655    use futures::StreamExt;
656    use tokio::io::{AsyncReadExt, AsyncWriteExt};
657    use tokio::net::TcpListener;
658
659    #[test]
660    fn openai_chat_body_adds_prompt_cache_controls() {
661        let req = BrainRequest {
662            system: Some("stable sparrow system".into()),
663            messages: vec![Msg {
664                role: "user".into(),
665                content: vec![ContentBlock::Text {
666                    text: "dynamic task".into(),
667                }],
668            }],
669            cache: PromptCacheConfig {
670                enabled: true,
671                ttl: PromptCacheTtl::OneHour,
672                key: Some("sparrow-repo-abc".into()),
673            },
674            ..BrainRequest::default()
675        };
676
677        let body = build_chat_body("gpt-test", &req, true);
678        assert_eq!(body["prompt_cache_key"], "sparrow-repo-abc");
679        assert_eq!(body["prompt_cache_retention"], "in_memory");
680    }
681
682    #[test]
683    fn openai_chat_body_serializes_image_blocks() {
684        let req = BrainRequest {
685            messages: vec![Msg {
686                role: "user".into(),
687                content: vec![
688                    ContentBlock::Text {
689                        text: "what is in this image?".into(),
690                    },
691                    ContentBlock::Image {
692                        source: crate::provider::ImageSource::Base64 {
693                            media_type: "image/png".into(),
694                            data: "iVBORw0KGgo=".into(),
695                        },
696                    },
697                ],
698            }],
699            ..BrainRequest::default()
700        };
701
702        let body = build_chat_body("gpt-test", &req, true);
703        assert_eq!(body["messages"][0]["content"][0]["type"], "text");
704        assert_eq!(body["messages"][0]["content"][1]["type"], "image_url");
705        assert_eq!(
706            body["messages"][0]["content"][1]["image_url"]["url"],
707            "data:image/png;base64,iVBORw0KGgo="
708        );
709    }
710
711    #[test]
712    fn openai_chat_body_reinjects_assistant_reasoning_content() {
713        let req = BrainRequest {
714            messages: vec![Msg {
715                role: "assistant".into(),
716                content: vec![
717                    ContentBlock::Reasoning {
718                        text: "opaque provider reasoning".into(),
719                    },
720                    ContentBlock::Text {
721                        text: "visible answer".into(),
722                    },
723                ],
724            }],
725            ..BrainRequest::default()
726        };
727
728        let body = build_chat_body("deepseek-test", &req, true);
729        assert_eq!(body["messages"][0]["content"], "visible answer");
730        assert_eq!(
731            body["messages"][0]["reasoning_content"],
732            "opaque provider reasoning"
733        );
734    }
735
736    #[test]
737    fn openai_chat_body_can_disable_reasoning_echo() {
738        let req = BrainRequest {
739            messages: vec![Msg {
740                role: "assistant".into(),
741                content: vec![
742                    ContentBlock::Reasoning {
743                        text: "provider-private reasoning".into(),
744                    },
745                    ContentBlock::Text {
746                        text: "visible answer".into(),
747                    },
748                ],
749            }],
750            ..BrainRequest::default()
751        };
752
753        let body = build_chat_body("provider-no-echo", &req, false);
754        assert_eq!(body["messages"][0]["content"], "visible answer");
755        assert!(
756            body["messages"][0].get("reasoning_content").is_none(),
757            "provider flagged echo_reasoning=false must not receive reasoning_content"
758        );
759    }
760
761    #[test]
762    fn multi_tool_turn_is_one_assistant_message_with_reasoning() {
763        // Regression for the v0.5.5 fix: a single model turn that emits N tool
764        // calls must serialize as ONE assistant message carrying
765        // reasoning_content + a tool_calls array of length N. Splitting it into
766        // one message per tool dropped reasoning_content from the 2nd+ calls,
767        // which DeepSeek/Qwen/Moonshot thinking-mode rejects with HTTP 400 and
768        // which aborted multi-file tasks half-way.
769        let req = BrainRequest {
770            messages: vec![Msg {
771                role: "assistant".into(),
772                content: vec![
773                    ContentBlock::Reasoning {
774                        text: "thinking about two files".into(),
775                    },
776                    ContentBlock::ToolUse {
777                        id: "call_0".into(),
778                        name: "fs_write".into(),
779                        input: serde_json::json!({"path": "reverse.py"}),
780                    },
781                    ContentBlock::ToolUse {
782                        id: "call_1".into(),
783                        name: "fs_write".into(),
784                        input: serde_json::json!({"path": "test_reverse.py"}),
785                    },
786                ],
787            }],
788            ..BrainRequest::default()
789        };
790
791        let body = build_chat_body("deepseek-test", &req, true);
792        // exactly one assistant message
793        assert_eq!(body["messages"].as_array().unwrap().len(), 1);
794        // reasoning_content present on it
795        assert_eq!(
796            body["messages"][0]["reasoning_content"],
797            "thinking about two files"
798        );
799        // both tool calls in a single tool_calls array
800        let calls = body["messages"][0]["tool_calls"].as_array().unwrap();
801        assert_eq!(calls.len(), 2);
802        assert_eq!(calls[0]["id"], "call_0");
803        assert_eq!(calls[1]["id"], "call_1");
804        assert_eq!(calls[0]["function"]["name"], "fs_write");
805    }
806
807    #[tokio::test]
808    async fn b1_partial_markup_stream_never_emits_visible_text() {
809        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
810        let addr = listener.local_addr().unwrap();
811        let server = tokio::spawn(async move {
812            let (mut socket, _) = listener.accept().await.unwrap();
813            let mut buf = [0_u8; 4096];
814            let _ = socket.read(&mut buf).await.unwrap();
815            let chunks = [
816                "<",
817                "||DSML||invoke name=\"read\">",
818                "<||DSML||parameter name=\"file_path\" string=\"true\">",
819                "config.py",
820                "</||DSML||parameter>",
821                "</||DSML||invoke>",
822            ];
823            let mut body = String::new();
824            for chunk in chunks {
825                body.push_str("data: ");
826                body.push_str(
827                    &serde_json::json!({
828                        "choices": [{
829                            "delta": {"content": chunk},
830                            "finish_reason": null
831                        }]
832                    })
833                    .to_string(),
834                );
835                body.push_str("\n\n");
836            }
837            body.push_str("data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n");
838            let response = format!(
839                "HTTP/1.1 200 OK\r\ncontent-type: text/event-stream\r\ncontent-length: {}\r\n\r\n{}",
840                body.len(),
841                body
842            );
843            socket.write_all(response.as_bytes()).await.unwrap();
844        });
845
846        let adapter =
847            OpenAICompatAdapter::new("deepseek-test", "test-key", &format!("http://{}", addr));
848        let mut stream = adapter.complete(BrainRequest::default()).await.unwrap();
849
850        let mut text = String::new();
851        let mut tool_name = None;
852        let mut tool_args = String::new();
853        let mut done = None;
854        while let Some(event) = stream.next().await {
855            match event {
856                BrainEvent::TextDelta(delta) => text.push_str(&delta),
857                BrainEvent::ToolUseStart { name, .. } => tool_name = Some(name),
858                BrainEvent::ToolUseDelta { json, .. } => tool_args.push_str(&json),
859                BrainEvent::ToolUseEnd { .. } => {}
860                BrainEvent::Done(reason) => done = Some(reason),
861                other => panic!("unexpected event: {other:?}"),
862            }
863        }
864        server.await.unwrap();
865
866        assert_eq!(
867            text, "",
868            "partial inline markup must not leak as visible text"
869        );
870        assert_eq!(tool_name.as_deref(), Some("read"));
871        let args: serde_json::Value = serde_json::from_str(&tool_args).unwrap();
872        assert_eq!(args["file_path"], "config.py");
873        assert!(matches!(done, Some(crate::event::StopReason::ToolUse)));
874    }
875}