1use async_trait::async_trait;
4use futures::StreamExt;
5use meerkat_core::schema::{CompiledSchema, SchemaError};
6use meerkat_core::{
7 AssistantBlock, ContentBlock, ImageData, Message, OutputSchema, StopReason, Usage,
8};
9use meerkat_llm_core::LlmError;
10use meerkat_llm_core::{
11 LlmClient, LlmDoneOutcome, LlmEvent, LlmRequest, LlmStream, ToolCallBuffer,
12};
13use meerkat_llm_core::{http, streaming};
14use serde::Deserialize;
15use serde_json::Value;
16use std::collections::HashMap;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum OpenAiCompatibleMode {
20 Responses,
21 ChatCompletions,
22}
23
24pub struct OpenAiCompatibleClient {
26 mode: OpenAiCompatibleMode,
27 remote_model: String,
28 bearer_token: Option<String>,
29 base_url: String,
30 http: reqwest::Client,
31 responses_delegate: Option<crate::OpenAiClient>,
32 supports_temperature: bool,
33 supports_thinking: bool,
34 supports_reasoning: bool,
35}
36
37impl OpenAiCompatibleClient {
38 pub fn new(
39 mode: OpenAiCompatibleMode,
40 remote_model: String,
41 base_url: String,
42 bearer_token: Option<String>,
43 supports_temperature: bool,
44 supports_thinking: bool,
45 supports_reasoning: bool,
46 ) -> Self {
47 let http = http::build_http_client_for_base_url(reqwest::Client::builder(), &base_url)
48 .unwrap_or_else(|_| reqwest::Client::new());
49 let responses_delegate = matches!(mode, OpenAiCompatibleMode::Responses).then(|| {
50 crate::OpenAiClient::new_with_optional_api_key_and_base_url(
51 bearer_token.clone(),
52 trim_v1_suffix(&base_url),
53 )
54 });
55 Self {
56 mode,
57 remote_model,
58 bearer_token,
59 base_url,
60 http,
61 responses_delegate,
62 supports_temperature,
63 supports_thinking,
64 supports_reasoning,
65 }
66 }
67
68 fn request_with_remote_model(&self, request: &LlmRequest) -> LlmRequest {
69 use meerkat_core::lifecycle::run_primitive::{OpenAiProviderTag, ProviderTag};
70 let mut request = request.clone();
71 request.model = self.remote_model.clone();
72 let mut tag = match request.provider_params.take() {
73 Some(ProviderTag::OpenAi(t)) => t,
74 Some(_) => OpenAiProviderTag::default(),
75 None => OpenAiProviderTag::default(),
76 };
77 tag.supports_temperature_override = Some(self.supports_temperature);
78 tag.supports_reasoning_override = Some(self.supports_reasoning);
79 request.provider_params = Some(ProviderTag::OpenAi(tag));
80 request
81 }
82
83 fn map_send_error(error: reqwest::Error) -> LlmError {
84 if error.is_timeout() {
85 LlmError::NetworkTimeout { duration_ms: 30000 }
86 } else if Self::is_connection_error(&error) {
87 LlmError::ConnectionReset
88 } else {
89 LlmError::Unknown {
90 message: error.to_string(),
91 }
92 }
93 }
94
95 #[cfg(not(target_arch = "wasm32"))]
96 fn is_connection_error(error: &reqwest::Error) -> bool {
97 error.is_connect()
98 }
99
100 #[cfg(target_arch = "wasm32")]
101 fn is_connection_error(_error: &reqwest::Error) -> bool {
102 false
103 }
104
105 fn build_chat_completions_body(&self, request: &LlmRequest) -> Result<Value, LlmError> {
106 let mut body = serde_json::json!({
107 "model": self.remote_model,
108 "messages": Self::convert_to_chat_messages(&request.messages)?,
109 "stream": true,
110 "stream_options": { "include_usage": true },
111 "max_completion_tokens": request.max_tokens,
112 });
113
114 if self.supports_temperature
115 && let Some(temp) = request.temperature
116 && let Some(num) = serde_json::Number::from_f64(temp as f64)
117 {
118 body["temperature"] = Value::Number(num);
119 }
120
121 if !request.tools.is_empty() {
122 body["tools"] = Value::Array(
123 request
124 .tools
125 .iter()
126 .map(|tool| {
127 serde_json::json!({
128 "type": "function",
129 "function": {
130 "name": tool.name,
131 "description": tool.description,
132 "parameters": tool.input_schema
133 }
134 })
135 })
136 .collect(),
137 );
138 }
139
140 if let Some(tag) = crate::client::openai_tag(request) {
141 use meerkat_core::lifecycle::run_primitive::ReasoningEffort as TypedReasoningEffort;
142 if self.supports_reasoning {
143 if let Some(reasoning) = tag.reasoning.as_ref() {
144 let v = reasoning.as_value();
145 if v.is_object() {
146 body["reasoning"] = v;
147 }
148 }
149 if let Some(effort) = tag.reasoning_effort {
150 let s = match effort {
151 TypedReasoningEffort::Low => "low",
152 TypedReasoningEffort::Medium => "medium",
153 TypedReasoningEffort::High => "high",
154 };
155 if !body["reasoning"].is_object() {
156 body["reasoning"] = serde_json::json!({});
157 }
158 body["reasoning"]["effort"] = Value::String(s.to_string());
159 body["reasoning_effort"] = Value::String(s.to_string());
160 }
161 if self.supports_thinking
162 && let Some(chat_template_kwargs) = tag.chat_template_kwargs.as_ref()
163 {
164 body["chat_template_kwargs"] = chat_template_kwargs.as_value();
165 }
166 if self.supports_thinking
167 && let Some(thinking) = tag.thinking.as_ref()
168 {
169 body["thinking"] = thinking.as_value();
170 }
171 }
172 if let Some(output_schema) = tag.structured_output.as_ref() {
173 let compiled =
174 self.compile_schema(output_schema)
175 .map_err(|e| LlmError::InvalidRequest {
176 message: e.to_string(),
177 })?;
178 body["response_format"] = serde_json::json!({
179 "type": "json_schema",
180 "json_schema": {
181 "name": output_schema.name.as_deref().unwrap_or("output"),
182 "schema": compiled.schema,
183 "strict": output_schema.strict
184 }
185 });
186 }
187 }
188
189 Ok(body)
190 }
191
192 fn convert_to_chat_messages(messages: &[Message]) -> Result<Vec<Value>, LlmError> {
193 let mut out = Vec::new();
194 for message in messages {
195 match message {
196 Message::System(system) => {
197 out.push(serde_json::json!({
198 "role": "system",
199 "content": system.content
200 }));
201 }
202 Message::SystemNotice(notice) => {
203 out.push(serde_json::json!({
204 "role": "user",
205 "content": notice.rendered_text()
206 }));
207 }
208 Message::User(user) => {
209 if meerkat_core::has_non_text_content(&user.content) {
210 let content: Vec<Value> = user
211 .content
212 .iter()
213 .map(|block| match block {
214 ContentBlock::Text { text } => serde_json::json!({
215 "type": "text",
216 "text": text
217 }),
218 ContentBlock::Image { media_type, data } => match data {
219 ImageData::Inline { data } => serde_json::json!({
220 "type": "image_url",
221 "image_url": {
222 "url": format!("data:{media_type};base64,{data}")
223 }
224 }),
225 ImageData::Blob { .. } => serde_json::json!({
226 "type": "text",
227 "text": block.text_projection()
228 }),
229 },
230 _ => serde_json::json!({
231 "type": "text",
232 "text": block.text_projection()
233 }),
234 })
235 .collect();
236 out.push(serde_json::json!({
237 "role": "user",
238 "content": content
239 }));
240 } else {
241 out.push(serde_json::json!({
242 "role": "user",
243 "content": user.text_content()
244 }));
245 }
246 }
247 Message::Assistant(assistant) => {
248 let tool_calls: Vec<Value> = assistant
249 .tool_calls
250 .iter()
251 .map(|tool_call| {
252 serde_json::json!({
253 "id": tool_call.id,
254 "type": "function",
255 "function": {
256 "name": tool_call.name,
257 "arguments": tool_call.args.to_string(),
258 }
259 })
260 })
261 .collect();
262 out.push(serde_json::json!({
263 "role": "assistant",
264 "content": if assistant.content.is_empty() {
265 Value::Null
266 } else {
267 Value::String(assistant.content.clone())
268 },
269 "tool_calls": tool_calls
270 }));
271 }
272 Message::BlockAssistant(assistant) => {
273 let mut text_parts = Vec::new();
274 let mut tool_calls = Vec::new();
275 for block in &assistant.blocks {
276 match block {
277 AssistantBlock::Text { text, .. } => {
278 if !text.is_empty() {
279 text_parts.push(text.clone());
280 }
281 }
282 AssistantBlock::ToolUse { id, name, args, .. } => {
283 tool_calls.push(serde_json::json!({
284 "id": id,
285 "type": "function",
286 "function": {
287 "name": name,
288 "arguments": args.get(),
289 }
290 }));
291 }
292 _ => {}
293 }
294 }
295 out.push(serde_json::json!({
296 "role": "assistant",
297 "content": if text_parts.is_empty() {
298 Value::Null
299 } else {
300 Value::String(text_parts.join("\n"))
301 },
302 "tool_calls": tool_calls
303 }));
304 }
305 Message::ToolResults { results, .. } => {
306 for result in results {
307 out.push(serde_json::json!({
308 "role": "tool",
309 "tool_call_id": result.tool_use_id,
310 "content": result.text_content()
311 }));
312 }
313 }
314 }
315 }
316 Ok(out)
317 }
318
319 fn apply_auth(
320 &self,
321 request: reqwest::RequestBuilder,
322 content_type: &'static str,
323 ) -> reqwest::RequestBuilder {
324 let request = request.header("Content-Type", content_type);
325 if let Some(token) = &self.bearer_token {
326 request.header("Authorization", format!("Bearer {token}"))
327 } else {
328 request
329 }
330 }
331
332 fn parse_chat_completions_line(line: &str) -> Result<Option<ChatCompletionsChunk>, LlmError> {
333 if let Some(data) = line
334 .strip_prefix("data: ")
335 .or_else(|| line.strip_prefix("data:"))
336 {
337 if data == "[DONE]" {
338 return Ok(None);
339 }
340 serde_json::from_str(data)
341 .map(Some)
342 .map_err(|err| LlmError::StreamParseError {
343 message: format!("failed to parse chat completions chunk: {err}; line={data}"),
344 })
345 } else {
346 Ok(None)
347 }
348 }
349}
350
351fn trim_v1_suffix(base_url: &str) -> String {
352 base_url
353 .trim_end_matches('/')
354 .trim_end_matches("/v1")
355 .to_string()
356}
357
358fn ensure_additional_properties_false(value: &mut Value) {
359 match value {
360 Value::Object(obj) => {
361 let is_object_type = match obj.get("type") {
362 Some(Value::String(t)) => t == "object",
363 Some(Value::Array(types)) => types.iter().any(|t| t.as_str() == Some("object")),
364 _ => obj.contains_key("properties") || obj.contains_key("required"),
365 };
366 if is_object_type && !obj.contains_key("additionalProperties") {
367 obj.insert("additionalProperties".to_string(), Value::Bool(false));
368 }
369 for child in obj.values_mut() {
370 ensure_additional_properties_false(child);
371 }
372 }
373 Value::Array(items) => {
374 for item in items.iter_mut() {
375 ensure_additional_properties_false(item);
376 }
377 }
378 _ => {}
379 }
380}
381
382#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
383#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
384impl LlmClient for OpenAiCompatibleClient {
385 fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
386 match self.mode {
387 OpenAiCompatibleMode::Responses => {
388 let Some(delegate) = self.responses_delegate.as_ref() else {
389 let inner: LlmStream<'a> = Box::pin(futures::stream::once(async {
390 Err(LlmError::InvalidRequest {
391 message: "responses mode requires a configured delegate client"
392 .to_string(),
393 })
394 }));
395 return inner;
396 };
397 let inner: LlmStream<'a> = Box::pin(async_stream::try_stream! {
398 let translated = self.request_with_remote_model(request);
399 let mut stream = delegate.stream(&translated);
400 while let Some(event) = stream.next().await {
401 yield event?;
402 }
403 });
404 streaming::ensure_terminal_done(inner)
405 }
406 OpenAiCompatibleMode::ChatCompletions => {
407 let inner: LlmStream<'a> = Box::pin(async_stream::try_stream! {
408 let body = self.build_chat_completions_body(request)?;
409 let response = self
410 .apply_auth(
411 self.http.post(format!("{}/chat/completions", self.base_url)),
412 "application/json",
413 )
414 .json(&body)
415 .send()
416 .await
417 .map_err(Self::map_send_error)?;
418
419 let status_code = response.status().as_u16();
420 let stream_result = if (200..=299).contains(&status_code) {
421 Ok(response.bytes_stream())
422 } else {
423 let headers = response.headers().clone();
424 let text = response.text().await.unwrap_or_default();
425 Err(LlmError::from_http_response(status_code, text, &headers))
426 };
427 let mut stream = stream_result?;
428 let mut buffer = String::with_capacity(512);
429 let mut tool_buffers: HashMap<usize, ToolCallBuffer> = HashMap::new();
430 let mut reasoning_text = String::new();
431 let mut done_emitted = false;
432
433 while let Some(chunk) = stream.next().await {
434 let chunk = chunk.map_err(|_| LlmError::ConnectionReset)?;
435 buffer.push_str(&String::from_utf8_lossy(&chunk));
436
437 while let Some(newline_pos) = buffer.find('\n') {
438 let line = buffer[..newline_pos].trim();
439 let should_process = !line.is_empty() && !line.starts_with(':');
440 let parsed = if should_process {
441 Self::parse_chat_completions_line(line)
442 } else {
443 Ok(None)
444 };
445 buffer.drain(..=newline_pos);
446
447 if let Some(event) = parsed? {
448 if let Some(event_usage) = event.usage {
449 let usage = Usage {
450 input_tokens: event_usage.prompt_tokens.unwrap_or(0),
451 output_tokens: event_usage.completion_tokens.unwrap_or(0),
452 cache_creation_tokens: None,
453 cache_read_tokens: None,
454 };
455 yield LlmEvent::UsageUpdate { usage };
456 }
457
458 for choice in event.choices {
459 if let Some(delta) = choice.delta {
460 if let Some(content) = delta.content
461 && !content.is_empty()
462 {
463 yield LlmEvent::TextDelta {
464 delta: content,
465 meta: None,
466 };
467 }
468 let reasoning_delta = delta
469 .reasoning_content
470 .as_ref()
471 .or(delta.reasoning.as_ref())
472 .or(delta.thinking.as_ref());
473 if let Some(reasoning) = reasoning_delta
474 && !reasoning.is_empty()
475 {
476 reasoning_text.push_str(reasoning);
477 yield LlmEvent::ReasoningDelta {
478 delta: reasoning.clone(),
479 };
480 }
481 if let Some(tool_calls) = delta.tool_calls {
482 for tool_call in tool_calls {
483 let index = tool_call.index.unwrap_or(0);
484 let buffer = tool_buffers.entry(index).or_insert_with(|| {
485 ToolCallBuffer::new(
486 tool_call
487 .id
488 .clone()
489 .unwrap_or_else(|| format!("tool_call_{index}")),
490 )
491 });
492 if let Some(id) = tool_call.id
493 && buffer.id.starts_with("tool_call_")
494 {
495 buffer.id = id;
496 }
497 if let Some(function) = tool_call.function {
498 if let Some(name) = function.name {
499 buffer.name = Some(name);
500 }
501 if let Some(arguments) = function.arguments
502 && !arguments.is_empty()
503 {
504 buffer.push_args(&arguments);
505 yield LlmEvent::ToolCallDelta {
506 id: buffer.id.clone(),
507 name: buffer.name.clone(),
508 args_delta: arguments,
509 };
510 }
511 }
512 }
513 }
514 }
515
516 if let Some(finish_reason) = choice.finish_reason {
517 let stop_reason = match finish_reason.as_str() {
518 "tool_calls" => StopReason::ToolUse,
519 "length" => StopReason::MaxTokens,
520 "content_filter" => StopReason::ContentFilter,
521 _ => StopReason::EndTurn,
522 };
523 if matches!(stop_reason, StopReason::ToolUse) {
524 for buffer in tool_buffers.values() {
525 if let Some(tool_call) = buffer.try_complete() {
526 yield LlmEvent::ToolCallComplete {
527 id: tool_call.id,
528 name: tool_call.name,
529 args: tool_call.args,
530 meta: None,
531 };
532 }
533 }
534 }
535 if !reasoning_text.is_empty() {
536 yield LlmEvent::ReasoningComplete {
537 text: std::mem::take(&mut reasoning_text),
538 meta: None,
539 };
540 }
541 if !done_emitted {
542 done_emitted = true;
543 yield LlmEvent::Done {
544 outcome: LlmDoneOutcome::Success { stop_reason },
545 };
546 }
547 }
548 }
549 }
550 }
551 }
552
553 if !buffer.trim().is_empty() {
554 Err::<(), _>(LlmError::IncompleteResponse {
555 message: format!(
556 "chat completions stream ended with an incomplete SSE buffer: {}",
557 buffer.trim()
558 ),
559 })?;
560 }
561 if !reasoning_text.is_empty() {
562 yield LlmEvent::ReasoningComplete {
563 text: reasoning_text,
564 meta: None,
565 };
566 }
567 if !done_emitted {
568 yield LlmEvent::Done {
569 outcome: LlmDoneOutcome::Success {
570 stop_reason: StopReason::EndTurn,
571 },
572 };
573 }
574 });
575
576 streaming::ensure_terminal_done(inner)
577 }
578 }
579 }
580
581 fn provider(&self) -> &'static str {
582 "self_hosted"
583 }
584
585 async fn health_check(&self) -> Result<(), LlmError> {
586 let response = self
587 .apply_auth(
588 self.http.get(format!("{}/models", self.base_url)),
589 "application/json",
590 )
591 .send()
592 .await
593 .map_err(|e| LlmError::Unknown {
594 message: e.to_string(),
595 })?;
596 let status = response.status().as_u16();
597 if (200..=299).contains(&status) {
598 Ok(())
599 } else {
600 let headers = response.headers().clone();
601 let text = response.text().await.unwrap_or_default();
602 Err(LlmError::from_http_response(status, text, &headers))
603 }
604 }
605
606 fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
607 let mut schema = output_schema.schema.as_value().clone();
608 if output_schema.strict {
609 ensure_additional_properties_false(&mut schema);
610 }
611 Ok(CompiledSchema {
612 schema,
613 warnings: Vec::new(),
614 })
615 }
616}
617
618#[derive(Debug, Deserialize)]
619struct ChatCompletionsChunk {
620 choices: Vec<ChatChoice>,
621 #[serde(default)]
622 usage: Option<ChatUsage>,
623}
624
625#[derive(Debug, Deserialize)]
626struct ChatChoice {
627 #[serde(default)]
628 delta: Option<ChatDelta>,
629 #[serde(default)]
630 finish_reason: Option<String>,
631}
632
633#[derive(Debug, Deserialize)]
634struct ChatDelta {
635 #[serde(default)]
636 content: Option<String>,
637 #[serde(default)]
638 reasoning: Option<String>,
639 #[serde(default)]
640 reasoning_content: Option<String>,
641 #[serde(default)]
642 thinking: Option<String>,
643 #[serde(default)]
644 tool_calls: Option<Vec<ChatToolCallDelta>>,
645}
646
647#[derive(Debug, Deserialize)]
648struct ChatToolCallDelta {
649 #[serde(default)]
650 index: Option<usize>,
651 #[serde(default)]
652 id: Option<String>,
653 #[serde(default)]
654 function: Option<ChatFunctionDelta>,
655}
656
657#[derive(Debug, Deserialize)]
658struct ChatFunctionDelta {
659 #[serde(default)]
660 name: Option<String>,
661 #[serde(default)]
662 arguments: Option<String>,
663}
664
665#[derive(Debug, Deserialize)]
666struct ChatUsage {
667 #[serde(default)]
668 prompt_tokens: Option<u64>,
669 #[serde(default)]
670 completion_tokens: Option<u64>,
671}
672
673#[cfg(test)]
674#[allow(clippy::unwrap_used, clippy::expect_used)]
675mod tests {
676 use super::*;
677 use axum::{
678 Json, Router,
679 extract::{Request, State},
680 response::IntoResponse,
681 routing::post,
682 };
683 use meerkat_core::UserMessage;
684 use std::sync::{Arc, Mutex};
685 use tokio::net::TcpListener;
686
687 async fn chat_sse(State(payload): State<String>) -> impl IntoResponse {
688 ([("content-type", "text/event-stream")], payload)
689 }
690
691 #[derive(Clone)]
692 struct ResponsesStubState {
693 payload: String,
694 auth_headers: Arc<Mutex<Vec<Option<String>>>>,
695 }
696
697 async fn responses_sse(
698 State(state): State<ResponsesStubState>,
699 request: Request,
700 ) -> impl IntoResponse {
701 let auth = request
702 .headers()
703 .get("authorization")
704 .and_then(|value| value.to_str().ok())
705 .map(std::string::ToString::to_string);
706 state
707 .auth_headers
708 .lock()
709 .expect("auth header capture lock")
710 .push(auth);
711 ([("content-type", "text/event-stream")], state.payload)
712 }
713
714 async fn models() -> impl IntoResponse {
715 Json(serde_json::json!({"data": []}))
716 }
717
718 async fn spawn_chat_stub_server(payload: String) -> (String, tokio::task::JoinHandle<()>) {
719 let app = Router::new()
720 .route("/v1/chat/completions", post(chat_sse))
721 .route("/v1/models", axum::routing::get(models))
722 .with_state(payload);
723 let listener = TcpListener::bind("127.0.0.1:0")
724 .await
725 .expect("bind test server");
726 let addr = listener.local_addr().expect("local addr");
727 let handle = tokio::spawn(async move {
728 axum::serve(listener, app).await.expect("serve test server");
729 });
730 (format!("http://{addr}/v1"), handle)
731 }
732
733 async fn spawn_responses_stub_server(
734 payload: String,
735 ) -> (
736 String,
737 Arc<Mutex<Vec<Option<String>>>>,
738 tokio::task::JoinHandle<()>,
739 ) {
740 let auth_headers = Arc::new(Mutex::new(Vec::new()));
741 let app = Router::new()
742 .route("/v1/responses", post(responses_sse))
743 .route("/v1/models", axum::routing::get(models))
744 .with_state(ResponsesStubState {
745 payload,
746 auth_headers: Arc::clone(&auth_headers),
747 });
748 let listener = TcpListener::bind("127.0.0.1:0")
749 .await
750 .expect("bind test server");
751 let addr = listener.local_addr().expect("local addr");
752 let handle = tokio::spawn(async move {
753 axum::serve(listener, app).await.expect("serve test server");
754 });
755 (format!("http://{addr}/v1"), auth_headers, handle)
756 }
757
758 #[tokio::test]
759 async fn chat_completions_stream_accumulates_tool_calls() {
760 let payload = concat!(
761 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"function\":{\"name\":\"read_file\",\"arguments\":\"{\\\"path\\\":\"}}]}}]}\n\n",
762 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"\\\"/tmp/a\\\"}\"}}]}}]}\n\n",
763 "data: {\"choices\":[{\"finish_reason\":\"tool_calls\"}],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":4}}\n\n",
764 "data: [DONE]\n\n"
765 )
766 .to_string();
767 let (base_url, handle) = spawn_chat_stub_server(payload).await;
768 let client = OpenAiCompatibleClient::new(
769 OpenAiCompatibleMode::ChatCompletions,
770 "remote-model".to_string(),
771 base_url,
772 None,
773 true,
774 false,
775 false,
776 );
777 let request = LlmRequest::new(
778 "gemma-4-31b",
779 vec![Message::User(UserMessage::text("hello".to_string()))],
780 );
781
782 let events: Vec<_> = client.stream(&request).collect().await;
783 let mut saw_complete = false;
784 let mut saw_done = false;
785 for event in events {
786 let event = event.expect("event");
787 match event {
788 LlmEvent::ToolCallComplete { id, name, args, .. } => {
789 saw_complete = true;
790 assert_eq!(id, "call_1");
791 assert_eq!(name, "read_file");
792 assert_eq!(args["path"], "/tmp/a");
793 }
794 LlmEvent::Done { outcome } => {
795 saw_done = true;
796 assert!(matches!(
797 outcome,
798 LlmDoneOutcome::Success {
799 stop_reason: StopReason::ToolUse
800 }
801 ));
802 }
803 _ => {}
804 }
805 }
806 assert!(saw_complete);
807 assert!(saw_done);
808 handle.abort();
809 }
810
811 #[tokio::test]
812 async fn chat_completions_stream_emits_reasoning_events() {
813 let payload = concat!(
814 "data: {\"choices\":[{\"delta\":{\"reasoning_content\":\"Let me think. \"}}]}\n\n",
815 "data: {\"choices\":[{\"delta\":{\"reasoning_content\":\"Need one more step.\"}}]}\n\n",
816 "data: {\"choices\":[{\"delta\":{\"content\":\"Final answer\"},\"finish_reason\":\"stop\"}]}\n\n",
817 "data: [DONE]\n\n"
818 )
819 .to_string();
820 let (base_url, handle) = spawn_chat_stub_server(payload).await;
821 let client = OpenAiCompatibleClient::new(
822 OpenAiCompatibleMode::ChatCompletions,
823 "remote-model".to_string(),
824 base_url,
825 None,
826 true,
827 true,
828 true,
829 );
830 let request = LlmRequest::new(
831 "gemma-4-31b",
832 vec![Message::User(UserMessage::text("hello".to_string()))],
833 );
834
835 let events: Vec<_> = client.stream(&request).collect().await;
836 let mut reasoning_deltas = Vec::new();
837 let mut reasoning_complete = None;
838 for event in events {
839 match event.expect("event") {
840 LlmEvent::ReasoningDelta { delta } => reasoning_deltas.push(delta),
841 LlmEvent::ReasoningComplete { text, .. } => reasoning_complete = Some(text),
842 _ => {}
843 }
844 }
845
846 assert_eq!(
847 reasoning_deltas,
848 vec![
849 "Let me think. ".to_string(),
850 "Need one more step.".to_string()
851 ]
852 );
853 assert_eq!(
854 reasoning_complete,
855 Some("Let me think. Need one more step.".to_string())
856 );
857 handle.abort();
858 }
859
860 #[test]
861 fn build_chat_completions_body_preserves_reasoning_overrides() {
862 let client = OpenAiCompatibleClient::new(
863 OpenAiCompatibleMode::ChatCompletions,
864 "remote-model".to_string(),
865 "http://localhost:11434/v1".to_string(),
866 None,
867 true,
868 true,
869 true,
870 );
871 let request = LlmRequest::new(
872 "gemma-4-31b",
873 vec![Message::User(UserMessage::text("hello".to_string()))],
874 )
875 .with_openai_tag_merge(|t| {
876 t.reasoning_effort =
877 Some(meerkat_core::lifecycle::run_primitive::ReasoningEffort::Medium);
878 t.chat_template_kwargs = Some(
879 meerkat_core::lifecycle::run_primitive::OpaqueProviderBody::from_value(
880 &serde_json::json!({"enable_thinking": true}),
881 ),
882 );
883 t.thinking = Some(
884 meerkat_core::lifecycle::run_primitive::OpaqueProviderBody::from_value(
885 &serde_json::json!({"type": "enabled"}),
886 ),
887 );
888 });
889
890 let body = client
891 .build_chat_completions_body(&request)
892 .expect("body should build");
893
894 assert_eq!(body["reasoning"]["effort"], "medium");
895 assert_eq!(body["reasoning_effort"], "medium");
896 assert_eq!(body["chat_template_kwargs"]["enable_thinking"], true);
897 assert_eq!(body["thinking"]["type"], "enabled");
898 }
899
900 #[tokio::test]
901 async fn responses_mode_uses_single_v1_prefix_and_omits_auth_when_unset() {
902 let payload = concat!(
903 "data: {\"type\":\"response.completed\",\"response\":{\"status\":\"completed\",\"output\":[{\"type\":\"message\",\"content\":[{\"type\":\"output_text\",\"text\":\"Hello\"}]}],\"usage\":{\"input_tokens\":10,\"output_tokens\":5}}}\n\n",
904 "data: {\"type\":\"response.done\",\"response\":{\"status\":\"completed\",\"output\":[],\"usage\":{\"input_tokens\":10,\"output_tokens\":5}}}\n\n"
905 )
906 .to_string();
907 let (base_url, auth_headers, handle) = spawn_responses_stub_server(payload).await;
908 let client = OpenAiCompatibleClient::new(
909 OpenAiCompatibleMode::Responses,
910 "gemma4:e2b".to_string(),
911 base_url,
912 None,
913 true,
914 true,
915 true,
916 );
917 let request = LlmRequest::new(
918 "gemma-4-e2b",
919 vec![Message::User(UserMessage::text("hello".to_string()))],
920 );
921
922 let events: Vec<_> = client.stream(&request).collect().await;
923 assert!(
924 events.iter().all(Result::is_ok),
925 "responses mode should succeed against a single /v1/responses endpoint"
926 );
927 let auth_headers = auth_headers.lock().expect("auth header capture lock");
928 assert_eq!(auth_headers.len(), 1);
929 assert_eq!(auth_headers[0], None);
930 handle.abort();
931 }
932
933 #[test]
934 fn request_with_remote_model_preserves_self_hosted_capabilities_for_delegate() {
935 let client = OpenAiCompatibleClient::new(
936 OpenAiCompatibleMode::Responses,
937 "gemma4:e2b".to_string(),
938 "http://localhost:11434/v1".to_string(),
939 None,
940 true,
941 true,
942 true,
943 );
944 let request = LlmRequest::new(
945 "gemma-4-e2b",
946 vec![Message::User(UserMessage::text("hello".to_string()))],
947 );
948
949 let translated = client.request_with_remote_model(&request);
950
951 assert_eq!(translated.model, "gemma4:e2b");
952 let tag = match translated.provider_params.as_ref() {
953 Some(meerkat_core::lifecycle::run_primitive::ProviderTag::OpenAi(t)) => t,
954 other => unreachable!("expected OpenAi variant, got {other:?}"),
955 };
956 assert_eq!(tag.supports_temperature_override, Some(true));
957 assert_eq!(tag.supports_reasoning_override, Some(true));
958 }
959
960 #[test]
961 fn parse_chat_completions_line_accepts_sse_data_without_space() {
962 let line = r#"data:{"choices":[{"delta":{"content":"Hello"}}]}"#;
963 let chunk =
964 OpenAiCompatibleClient::parse_chat_completions_line(line).expect("line should parse");
965 assert!(chunk.is_some());
966 }
967
968 #[test]
969 fn ensure_additional_properties_false_recurses_into_nested_objects() {
970 let mut value = serde_json::json!({
971 "type": "object",
972 "properties": {
973 "outer": {
974 "type": "object",
975 "properties": {
976 "inner": {
977 "type": "object",
978 "properties": {}
979 }
980 }
981 }
982 }
983 });
984
985 ensure_additional_properties_false(&mut value);
986
987 assert_eq!(value["additionalProperties"], Value::Bool(false));
988 assert_eq!(
989 value["properties"]["outer"]["additionalProperties"],
990 Value::Bool(false)
991 );
992 assert_eq!(
993 value["properties"]["outer"]["properties"]["inner"]["additionalProperties"],
994 Value::Bool(false)
995 );
996 }
997}