sparrow/provider/openai_compat.rs
1use async_trait::async_trait;
2use futures::stream::{self, StreamExt};
3use reqwest::Client;
4use serde_json::json;
5use std::collections::HashMap;
6
7use super::{Brain, BrainEvent, BrainRequest, BrainStream, ContentBlock, LatencyClass, ModelCaps};
8
9/// Process-monotonic counter for synthesized tool-call ids (B8): markup-derived
10/// and id-less native calls get a unique id so two turns in one run can't
11/// collide on `markup-call-0` and confuse id-keyed approval/replay state.
12static SYNTH_TOOL_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
13
14fn next_synth_id(kind: &str) -> String {
15 let n = SYNTH_TOOL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
16 format!("{kind}-call-{n}")
17}
18
19/// Sorted indices of a tool-call accumulator, ascending. Used to emit
20/// `ToolUseEnd` in the order the model declared the calls (index order), not
21/// the arbitrary order a `HashMap` drains in (A1/A2).
22fn sorted_indices(keys: impl Iterator<Item = u64>) -> Vec<u64> {
23 let mut idxs: Vec<u64> = keys.collect();
24 idxs.sort_unstable();
25 idxs
26}
27
28/// OpenAI-compatible adapter. Covers OpenAI, Groq, NVIDIA NIM, Together, Cerebras,
29/// OpenRouter, NovitaAI, Nous Portal, HuggingFace, Ollama, and custom endpoints.
30pub struct OpenAICompatAdapter {
31 model: String,
32 api_key: String,
33 base_url: String,
34 client: Client,
35 caps: ModelCaps,
36 echo_reasoning: bool,
37}
38
39impl OpenAICompatAdapter {
40 pub fn new(model: &str, api_key: impl Into<String>, base_url: &str) -> Self {
41 let model = model.to_string();
42 Self {
43 model,
44 api_key: api_key.into(),
45 base_url: base_url.to_string(),
46 client: Client::new(),
47 caps: ModelCaps::default(),
48 echo_reasoning: true,
49 }
50 }
51
52 pub fn with_caps(mut self, caps: ModelCaps) -> Self {
53 self.caps = caps;
54 self
55 }
56
57 pub fn with_echo_reasoning(mut self, echo_reasoning: bool) -> Self {
58 self.echo_reasoning = echo_reasoning;
59 self
60 }
61
62 /// Create an Ollama adapter (OpenAI-compatible API on localhost)
63 pub fn ollama(model: &str, base_url: &str) -> Self {
64 // Ollama doesn't require an API key
65 Self::new(model, "ollama", base_url).with_caps(ModelCaps {
66 context_window: 32_768,
67 max_output: 8_000,
68 tools: true,
69 vision: false,
70 cost_input_per_mtok: 0.0,
71 cost_output_per_mtok: 0.0,
72 latency: LatencyClass::Medium,
73 })
74 }
75}
76
77fn build_chat_body(model: &str, req: &BrainRequest, echo_reasoning: bool) -> serde_json::Value {
78 let mut messages: Vec<serde_json::Value> = Vec::new();
79
80 // Add system message
81 if let Some(sys) = &req.system {
82 messages.push(json!({
83 "role": "system",
84 "content": sys,
85 }));
86 }
87
88 // Convert messages
89 for msg in &req.messages {
90 if msg.role == "system" {
91 messages.push(json!({
92 "role": "system",
93 "content": msg.content.iter()
94 .filter_map(|b| match b {
95 ContentBlock::Text { text } => Some(text.clone()),
96 _ => None,
97 })
98 .collect::<Vec<_>>()
99 .join("\n"),
100 }));
101 continue;
102 }
103
104 let mut content: Vec<serde_json::Value> = Vec::new();
105 let mut tool_calls: Vec<serde_json::Value> = Vec::new();
106 let mut reasoning_buf = String::new();
107 let mut emitted_tool_result = false;
108
109 for block in &msg.content {
110 match block {
111 ContentBlock::Text { text } => {
112 content.push(json!({"type": "text", "text": text}));
113 }
114 ContentBlock::Image { source } => {
115 content.push(json!({
116 "type": "image_url",
117 "image_url": {
118 "url": image_source_url(source),
119 }
120 }));
121 }
122 ContentBlock::Reasoning { text } if echo_reasoning => {
123 // DeepSeek / Moonshot / Qwen "thinking mode" require the
124 // model's previous reasoning_content to be echoed back
125 // on the next turn or the API rejects with 400. We aggregate
126 // all reasoning blocks of this message and ship them as a
127 // single `reasoning_content` field.
128 if !reasoning_buf.is_empty() {
129 reasoning_buf.push('\n');
130 }
131 reasoning_buf.push_str(text);
132 }
133 ContentBlock::Reasoning { .. } => {}
134 ContentBlock::ToolUse { id, name, input } => {
135 tool_calls.push(json!({
136 "id": id,
137 "type": "function",
138 "function": {
139 "name": name,
140 "arguments": serde_json::to_string(input).unwrap_or_default(),
141 }
142 }));
143 }
144 ContentBlock::ToolResult {
145 tool_use_id,
146 content: tool_content,
147 ..
148 } => {
149 let text = tool_content
150 .iter()
151 .filter_map(|b| match b {
152 ContentBlock::Text { text } => Some(text.clone()),
153 _ => None,
154 })
155 .collect::<Vec<_>>()
156 .join("\n");
157 messages.push(json!({
158 "role": "tool",
159 "tool_call_id": tool_use_id,
160 "content": text,
161 }));
162 emitted_tool_result = true;
163 continue; // tool results are separate messages
164 }
165 }
166 }
167
168 if emitted_tool_result && content.is_empty() && tool_calls.is_empty() {
169 continue;
170 }
171
172 let mut msg_json = json!({ "role": msg.role });
173
174 if !tool_calls.is_empty() {
175 msg_json["tool_calls"] = json!(tool_calls);
176 }
177 if !content.is_empty() {
178 if content.len() == 1 && content[0]["type"] == "text" {
179 msg_json["content"] = json!(content[0]["text"]);
180 } else {
181 msg_json["content"] = json!(content);
182 }
183 }
184 if !reasoning_buf.is_empty() && msg.role == "assistant" {
185 msg_json["reasoning_content"] = json!(reasoning_buf);
186 }
187
188 messages.push(msg_json);
189 }
190
191 // Build tools
192 let tools: Vec<serde_json::Value> = req
193 .tools
194 .iter()
195 .map(|t| {
196 json!({
197 "type": "function",
198 "function": {
199 "name": t.name,
200 "description": t.description,
201 "parameters": t.input_schema,
202 }
203 })
204 })
205 .collect();
206
207 let mut body = json!({
208 "model": model,
209 "messages": messages,
210 "stream": true,
211 "stream_options": {
212 "include_usage": true
213 },
214 "temperature": req.temperature,
215 });
216
217 if req.max_tokens > 0 {
218 body["max_tokens"] = json!(req.max_tokens);
219 }
220 if !tools.is_empty() {
221 body["tools"] = json!(tools);
222 }
223 if !req.stop.is_empty() {
224 body["stop"] = json!(req.stop);
225 }
226 if req.cache.enabled {
227 if let Some(key) = &req.cache.key {
228 body["prompt_cache_key"] = json!(key);
229 }
230 body["prompt_cache_retention"] = json!(req.cache.ttl.openai_retention());
231 }
232
233 body
234}
235
236fn image_source_url(source: &super::ImageSource) -> String {
237 match source {
238 super::ImageSource::Base64 { media_type, data } => {
239 format!("data:{};base64,{}", media_type, data)
240 }
241 super::ImageSource::Url { url } => url.clone(),
242 }
243}
244
245#[async_trait]
246impl Brain for OpenAICompatAdapter {
247 fn id(&self) -> &str {
248 &self.model
249 }
250
251 fn caps(&self) -> ModelCaps {
252 self.caps.clone()
253 }
254
255 async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
256 let body = build_chat_body(&self.model, &req, self.echo_reasoning);
257
258 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
259
260 let response = self
261 .client
262 .post(&url)
263 .header("Authorization", format!("Bearer {}", self.api_key))
264 .json(&body)
265 .send()
266 .await?;
267
268 if !response.status().is_success() {
269 let status = response.status().as_u16();
270 let body = response.text().await.unwrap_or_default();
271 return Err(anyhow::anyhow!(
272 "OpenAI-compatible API error {}: {}",
273 status,
274 body
275 ));
276 }
277
278 #[derive(Default)]
279 struct ToolCallState {
280 id: String,
281 started: bool,
282 }
283
284 let stream = response.bytes_stream();
285
286 // SSE state: tool-call accumulator + line buffer that survives chunk
287 // boundaries. Without the buffer, a JSON event split across two TCP
288 // chunks was parsed in halves and silently dropped — producing the
289 // "à rebours" → "àours" mangling.
290 struct SseState {
291 tools: HashMap<u64, ToolCallState>,
292 lines: super::sse_buffer::LineBuffer,
293 /// Accumulated assistant `content` text for this completion. Used
294 /// to recover tool calls a provider emitted as inline XML/DSML
295 /// markup inside `content` rather than as native `tool_calls`
296 /// (see provider::tool_markup).
297 content_buf: String,
298 /// True once we've decided the content is inline tool-call markup
299 /// and should be suppressed from the visible text stream.
300 suppress_text: bool,
301 /// Text held while the beginning of `content` is ambiguous: it may
302 /// still become inline tool-call markup once more chunks arrive.
303 pending_text: String,
304 /// B4: true once reasoning has been seen on the streaming `delta`
305 /// path. Providers also repeat the full reasoning under
306 /// `message.reasoning_content` on the final chunk; without this
307 /// flag the engine concatenated both and echoed doubled reasoning
308 /// back (context bloat + 400 risk). We take delta OR message,
309 /// never both.
310 reasoning_seen: bool,
311 }
312
313 let event_stream = stream
314 .scan(
315 SseState {
316 tools: HashMap::new(),
317 lines: super::sse_buffer::LineBuffer::new(),
318 content_buf: String::new(),
319 suppress_text: false,
320 pending_text: String::new(),
321 reasoning_seen: false,
322 },
323 |state, chunk| {
324 let events: Vec<BrainEvent> = match chunk {
325 Ok(bytes) => {
326 let lines = state.lines.push(&bytes);
327 let tool_state = &mut state.tools;
328 let mut parsed = Vec::new();
329 for line in lines {
330 let line = line.trim();
331 if line.is_empty() || !line.starts_with("data: ") {
332 continue;
333 }
334 let data = &line[6..];
335 if data == "[DONE]" {
336 continue;
337 }
338 let event: serde_json::Value = match serde_json::from_str(data) {
339 Ok(v) => v,
340 Err(e) => {
341 tracing::debug!(
342 "JSON parse error: {} — data: {}",
343 e,
344 &data[..data.len().min(200)]
345 );
346 continue;
347 }
348 };
349
350 if let Some(choices) = event["choices"].as_array() {
351 for choice in choices {
352 if let Some(delta) = choice["delta"].as_object() {
353 if let Some(text) =
354 delta.get("content").and_then(|v| v.as_str())
355 {
356 if !text.is_empty() {
357 state.content_buf.push_str(text);
358 state.pending_text.push_str(text);
359 // If this completion's content turns
360 // out to be inline tool-call markup
361 // (DeepSeek DSML / Anthropic-style
362 // <invoke>), suppress it from the
363 // visible text stream — it'll be
364 // converted to real tool calls at
365 // finish_reason.
366 if !state.suppress_text
367 && super::tool_markup::looks_like_tool_markup(
368 &state.content_buf,
369 )
370 {
371 state.suppress_text = true;
372 state.pending_text.clear();
373 }
374 if !state.suppress_text
375 && !super::tool_markup::could_be_tool_markup_prefix(
376 &state.content_buf,
377 )
378 && !state.pending_text.is_empty()
379 {
380 parsed.push(BrainEvent::TextDelta(
381 std::mem::take(&mut state.pending_text),
382 ));
383 }
384 }
385 }
386 // DeepSeek / Moonshot thinking-mode emit
387 // reasoning trace alongside content. Capture
388 // it as a dedicated event so the engine can
389 // echo it back on the next turn (required
390 // by DeepSeek's contract).
391 // Several providers report this under
392 // different keys; check the known aliases.
393 for key in [
394 "reasoning_content",
395 "reasoning",
396 "thinking",
397 "thought",
398 ] {
399 if let Some(rtext) =
400 delta.get(key).and_then(|v| v.as_str())
401 {
402 if !rtext.is_empty() {
403 state.reasoning_seen = true;
404 parsed.push(BrainEvent::ReasoningDelta(
405 rtext.to_string(),
406 ));
407 }
408 }
409 }
410 }
411 // Some providers bundle the reasoning under
412 // `message.reasoning_content` on the final chunk
413 // rather than streaming it through `delta`. B4:
414 // only use it when nothing streamed via delta —
415 // otherwise it's the SAME trace repeated and
416 // concatenating both doubles it.
417 if !state.reasoning_seen {
418 if let Some(msg_obj) =
419 choice.get("message").and_then(|v| v.as_object())
420 {
421 for key in
422 ["reasoning_content", "reasoning", "thinking"]
423 {
424 if let Some(rtext) =
425 msg_obj.get(key).and_then(|v| v.as_str())
426 {
427 if !rtext.is_empty() {
428 state.reasoning_seen = true;
429 parsed.push(BrainEvent::ReasoningDelta(
430 rtext.to_string(),
431 ));
432 }
433 }
434 }
435 }
436 }
437 if let Some(delta) = choice["delta"].as_object() {
438 // (Re-open the original tool_calls block.)
439 let _ = delta; // keep this branch syntactically anchored
440 if let Some(tool_calls) =
441 delta.get("tool_calls").and_then(|v| v.as_array())
442 {
443 for tc in tool_calls {
444 let idx = tc
445 .get("index")
446 .and_then(|v| v.as_u64())
447 .unwrap_or(0);
448 let id = tc
449 .get("id")
450 .and_then(|v| v.as_str())
451 .map(|s| s.to_string());
452 let state = tool_state.entry(idx).or_default();
453 if let Some(id) = id {
454 state.id = id;
455 }
456 if let Some(func) = tc
457 .get("function")
458 .and_then(|v| v.as_object())
459 {
460 if let Some(name) = func
461 .get("name")
462 .and_then(|v| v.as_str())
463 {
464 if !state.started {
465 if state.id.is_empty() {
466 // B8: unique even when
467 // the provider omits the
468 // id, across turns.
469 state.id =
470 next_synth_id("tool");
471 }
472 state.started = true;
473 parsed.push(
474 BrainEvent::ToolUseStart {
475 id: state.id.clone(),
476 name: name.to_string(),
477 },
478 );
479 }
480 }
481 if let Some(args) = func
482 .get("arguments")
483 .and_then(|v| v.as_str())
484 {
485 if !state.id.is_empty()
486 && !args.is_empty()
487 {
488 parsed.push(
489 BrainEvent::ToolUseDelta {
490 id: state.id.clone(),
491 json: args.to_string(),
492 },
493 );
494 }
495 }
496 }
497 }
498 }
499 }
500
501 if let Some(reason) =
502 choice.get("finish_reason").and_then(|v| v.as_str())
503 {
504 if !reason.is_empty() && reason != "null" {
505 let stop = match reason {
506 "stop" => {
507 // A2: a provider may stream native
508 // tool_calls and then finish with
509 // "stop" (not "tool_calls"). Drain
510 // any pending native calls FIRST so
511 // they actually execute instead of
512 // being silently dropped.
513 let mut native = false;
514 for idx in sorted_indices(
515 tool_state.keys().copied(),
516 ) {
517 if let Some(st) =
518 tool_state.remove(&idx)
519 {
520 if !st.id.is_empty() {
521 parsed.push(
522 BrainEvent::ToolUseEnd {
523 id: st.id,
524 },
525 );
526 native = true;
527 }
528 }
529 }
530 // Otherwise recover tool calls a
531 // provider emitted as inline
532 // XML/DSML markup in `content` (with
533 // finish_reason "stop") instead of
534 // native tool_calls — without this
535 // the call leaks as raw text and
536 // never runs.
537 let calls = if !native
538 && super::tool_markup::looks_like_tool_markup(
539 &state.content_buf,
540 )
541 {
542 super::tool_markup::extract_tool_calls(
543 &state.content_buf,
544 )
545 } else {
546 Vec::new()
547 };
548 if native {
549 crate::event::StopReason::ToolUse
550 } else if calls.is_empty() {
551 if !state.suppress_text
552 && !state.pending_text.is_empty()
553 {
554 parsed.push(
555 BrainEvent::TextDelta(
556 std::mem::take(
557 &mut state.pending_text,
558 ),
559 ),
560 );
561 }
562 crate::event::StopReason::EndTurn
563 } else {
564 for call in calls.into_iter() {
565 // B8: unique id per
566 // synthesized call so two
567 // markup turns in one run
568 // never collide.
569 let id = next_synth_id("markup");
570 parsed.push(
571 BrainEvent::ToolUseStart {
572 id: id.clone(),
573 name: call.name,
574 },
575 );
576 parsed.push(
577 BrainEvent::ToolUseDelta {
578 id: id.clone(),
579 json: call
580 .args
581 .to_string(),
582 },
583 );
584 parsed.push(
585 BrainEvent::ToolUseEnd { id },
586 );
587 }
588 crate::event::StopReason::ToolUse
589 }
590 }
591 "length" => crate::event::StopReason::MaxTokens,
592 "tool_calls" => {
593 // A1/A2: emit Ends in index order,
594 // not HashMap-arbitrary order.
595 for idx in sorted_indices(
596 tool_state.keys().copied(),
597 ) {
598 if let Some(st) =
599 tool_state.remove(&idx)
600 {
601 if !st.id.is_empty() {
602 parsed.push(
603 BrainEvent::ToolUseEnd {
604 id: st.id,
605 },
606 );
607 }
608 }
609 }
610 crate::event::StopReason::ToolUse
611 }
612 s => crate::event::StopReason::StopSequence(
613 s.to_string(),
614 ),
615 };
616 parsed.push(BrainEvent::Done(stop));
617 }
618 }
619 }
620 }
621
622 if let Some(usage) = event.get("usage").and_then(|u| u.as_object())
623 {
624 // Use .get() — indexing a serde_json::Map with [] panics on a
625 // missing key, and some providers (e.g. MiniMax) omit fields.
626 parsed.push(BrainEvent::Usage(crate::event::TokenUsage {
627 input: usage
628 .get("prompt_tokens")
629 .and_then(|v| v.as_u64())
630 .unwrap_or(0),
631 output: usage
632 .get("completion_tokens")
633 .and_then(|v| v.as_u64())
634 .unwrap_or(0),
635 }));
636 }
637 }
638 parsed
639 }
640 Err(e) => vec![BrainEvent::Error(format!("stream error: {}", e))],
641 };
642 futures::future::ready(Some(stream::iter(events)))
643 },
644 )
645 .flatten();
646
647 Ok(Box::pin(event_stream))
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654 use crate::provider::{Msg, PromptCacheConfig, PromptCacheTtl};
655 use futures::StreamExt;
656 use tokio::io::{AsyncReadExt, AsyncWriteExt};
657 use tokio::net::TcpListener;
658
659 #[test]
660 fn openai_chat_body_adds_prompt_cache_controls() {
661 let req = BrainRequest {
662 system: Some("stable sparrow system".into()),
663 messages: vec![Msg {
664 role: "user".into(),
665 content: vec![ContentBlock::Text {
666 text: "dynamic task".into(),
667 }],
668 }],
669 cache: PromptCacheConfig {
670 enabled: true,
671 ttl: PromptCacheTtl::OneHour,
672 key: Some("sparrow-repo-abc".into()),
673 },
674 ..BrainRequest::default()
675 };
676
677 let body = build_chat_body("gpt-test", &req, true);
678 assert_eq!(body["prompt_cache_key"], "sparrow-repo-abc");
679 assert_eq!(body["prompt_cache_retention"], "in_memory");
680 }
681
682 #[test]
683 fn openai_chat_body_serializes_image_blocks() {
684 let req = BrainRequest {
685 messages: vec![Msg {
686 role: "user".into(),
687 content: vec![
688 ContentBlock::Text {
689 text: "what is in this image?".into(),
690 },
691 ContentBlock::Image {
692 source: crate::provider::ImageSource::Base64 {
693 media_type: "image/png".into(),
694 data: "iVBORw0KGgo=".into(),
695 },
696 },
697 ],
698 }],
699 ..BrainRequest::default()
700 };
701
702 let body = build_chat_body("gpt-test", &req, true);
703 assert_eq!(body["messages"][0]["content"][0]["type"], "text");
704 assert_eq!(body["messages"][0]["content"][1]["type"], "image_url");
705 assert_eq!(
706 body["messages"][0]["content"][1]["image_url"]["url"],
707 "data:image/png;base64,iVBORw0KGgo="
708 );
709 }
710
711 #[test]
712 fn openai_chat_body_reinjects_assistant_reasoning_content() {
713 let req = BrainRequest {
714 messages: vec![Msg {
715 role: "assistant".into(),
716 content: vec![
717 ContentBlock::Reasoning {
718 text: "opaque provider reasoning".into(),
719 },
720 ContentBlock::Text {
721 text: "visible answer".into(),
722 },
723 ],
724 }],
725 ..BrainRequest::default()
726 };
727
728 let body = build_chat_body("deepseek-test", &req, true);
729 assert_eq!(body["messages"][0]["content"], "visible answer");
730 assert_eq!(
731 body["messages"][0]["reasoning_content"],
732 "opaque provider reasoning"
733 );
734 }
735
736 #[test]
737 fn openai_chat_body_can_disable_reasoning_echo() {
738 let req = BrainRequest {
739 messages: vec![Msg {
740 role: "assistant".into(),
741 content: vec![
742 ContentBlock::Reasoning {
743 text: "provider-private reasoning".into(),
744 },
745 ContentBlock::Text {
746 text: "visible answer".into(),
747 },
748 ],
749 }],
750 ..BrainRequest::default()
751 };
752
753 let body = build_chat_body("provider-no-echo", &req, false);
754 assert_eq!(body["messages"][0]["content"], "visible answer");
755 assert!(
756 body["messages"][0].get("reasoning_content").is_none(),
757 "provider flagged echo_reasoning=false must not receive reasoning_content"
758 );
759 }
760
761 #[test]
762 fn multi_tool_turn_is_one_assistant_message_with_reasoning() {
763 // Regression for the v0.5.5 fix: a single model turn that emits N tool
764 // calls must serialize as ONE assistant message carrying
765 // reasoning_content + a tool_calls array of length N. Splitting it into
766 // one message per tool dropped reasoning_content from the 2nd+ calls,
767 // which DeepSeek/Qwen/Moonshot thinking-mode rejects with HTTP 400 and
768 // which aborted multi-file tasks half-way.
769 let req = BrainRequest {
770 messages: vec![Msg {
771 role: "assistant".into(),
772 content: vec![
773 ContentBlock::Reasoning {
774 text: "thinking about two files".into(),
775 },
776 ContentBlock::ToolUse {
777 id: "call_0".into(),
778 name: "fs_write".into(),
779 input: serde_json::json!({"path": "reverse.py"}),
780 },
781 ContentBlock::ToolUse {
782 id: "call_1".into(),
783 name: "fs_write".into(),
784 input: serde_json::json!({"path": "test_reverse.py"}),
785 },
786 ],
787 }],
788 ..BrainRequest::default()
789 };
790
791 let body = build_chat_body("deepseek-test", &req, true);
792 // exactly one assistant message
793 assert_eq!(body["messages"].as_array().unwrap().len(), 1);
794 // reasoning_content present on it
795 assert_eq!(
796 body["messages"][0]["reasoning_content"],
797 "thinking about two files"
798 );
799 // both tool calls in a single tool_calls array
800 let calls = body["messages"][0]["tool_calls"].as_array().unwrap();
801 assert_eq!(calls.len(), 2);
802 assert_eq!(calls[0]["id"], "call_0");
803 assert_eq!(calls[1]["id"], "call_1");
804 assert_eq!(calls[0]["function"]["name"], "fs_write");
805 }
806
807 #[tokio::test]
808 async fn b1_partial_markup_stream_never_emits_visible_text() {
809 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
810 let addr = listener.local_addr().unwrap();
811 let server = tokio::spawn(async move {
812 let (mut socket, _) = listener.accept().await.unwrap();
813 let mut buf = [0_u8; 4096];
814 let _ = socket.read(&mut buf).await.unwrap();
815 let chunks = [
816 "<",
817 "||DSML||invoke name=\"read\">",
818 "<||DSML||parameter name=\"file_path\" string=\"true\">",
819 "config.py",
820 "</||DSML||parameter>",
821 "</||DSML||invoke>",
822 ];
823 let mut body = String::new();
824 for chunk in chunks {
825 body.push_str("data: ");
826 body.push_str(
827 &serde_json::json!({
828 "choices": [{
829 "delta": {"content": chunk},
830 "finish_reason": null
831 }]
832 })
833 .to_string(),
834 );
835 body.push_str("\n\n");
836 }
837 body.push_str("data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n");
838 let response = format!(
839 "HTTP/1.1 200 OK\r\ncontent-type: text/event-stream\r\ncontent-length: {}\r\n\r\n{}",
840 body.len(),
841 body
842 );
843 socket.write_all(response.as_bytes()).await.unwrap();
844 });
845
846 let adapter =
847 OpenAICompatAdapter::new("deepseek-test", "test-key", &format!("http://{}", addr));
848 let mut stream = adapter.complete(BrainRequest::default()).await.unwrap();
849
850 let mut text = String::new();
851 let mut tool_name = None;
852 let mut tool_args = String::new();
853 let mut done = None;
854 while let Some(event) = stream.next().await {
855 match event {
856 BrainEvent::TextDelta(delta) => text.push_str(&delta),
857 BrainEvent::ToolUseStart { name, .. } => tool_name = Some(name),
858 BrainEvent::ToolUseDelta { json, .. } => tool_args.push_str(&json),
859 BrainEvent::ToolUseEnd { .. } => {}
860 BrainEvent::Done(reason) => done = Some(reason),
861 other => panic!("unexpected event: {other:?}"),
862 }
863 }
864 server.await.unwrap();
865
866 assert_eq!(
867 text, "",
868 "partial inline markup must not leak as visible text"
869 );
870 assert_eq!(tool_name.as_deref(), Some("read"));
871 let args: serde_json::Value = serde_json::from_str(&tool_args).unwrap();
872 assert_eq!(args["file_path"], "config.py");
873 assert!(matches!(done, Some(crate::event::StopReason::ToolUse)));
874 }
875}