Skip to main content

meerkat_openai/
client_compatible.rs

1//! OpenAI-compatible client for self-hosted endpoints.
2
3use async_trait::async_trait;
4use futures::StreamExt;
5use meerkat_core::schema::{CompiledSchema, SchemaError};
6use meerkat_core::{
7    AssistantBlock, ContentBlock, ImageData, Message, OutputSchema, StopReason, Usage,
8};
9use meerkat_llm_core::LlmError;
10use meerkat_llm_core::{
11    LlmClient, LlmDoneOutcome, LlmEvent, LlmRequest, LlmStream, ToolCallBuffer,
12};
13use meerkat_llm_core::{http, streaming};
14use serde::Deserialize;
15use serde_json::Value;
16use std::collections::HashMap;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum OpenAiCompatibleMode {
20    Responses,
21    ChatCompletions,
22}
23
24/// OpenAI-compatible client for self-hosted servers.
25pub struct OpenAiCompatibleClient {
26    mode: OpenAiCompatibleMode,
27    remote_model: String,
28    bearer_token: Option<String>,
29    base_url: String,
30    http: reqwest::Client,
31    responses_delegate: Option<crate::OpenAiClient>,
32    supports_temperature: bool,
33    supports_thinking: bool,
34    supports_reasoning: bool,
35}
36
37impl OpenAiCompatibleClient {
38    pub fn new(
39        mode: OpenAiCompatibleMode,
40        remote_model: String,
41        base_url: String,
42        bearer_token: Option<String>,
43        supports_temperature: bool,
44        supports_thinking: bool,
45        supports_reasoning: bool,
46    ) -> Self {
47        let http = http::build_http_client_for_base_url(reqwest::Client::builder(), &base_url)
48            .unwrap_or_else(|_| reqwest::Client::new());
49        let responses_delegate = matches!(mode, OpenAiCompatibleMode::Responses).then(|| {
50            crate::OpenAiClient::new_with_optional_api_key_and_base_url(
51                bearer_token.clone(),
52                trim_v1_suffix(&base_url),
53            )
54        });
55        Self {
56            mode,
57            remote_model,
58            bearer_token,
59            base_url,
60            http,
61            responses_delegate,
62            supports_temperature,
63            supports_thinking,
64            supports_reasoning,
65        }
66    }
67
68    fn request_with_remote_model(&self, request: &LlmRequest) -> LlmRequest {
69        use meerkat_core::lifecycle::run_primitive::{OpenAiProviderTag, ProviderTag};
70        let mut request = request.clone();
71        request.model = self.remote_model.clone();
72        let mut tag = match request.provider_params.take() {
73            Some(ProviderTag::OpenAi(t)) => t,
74            Some(_) => OpenAiProviderTag::default(),
75            None => OpenAiProviderTag::default(),
76        };
77        tag.supports_temperature_override = Some(self.supports_temperature);
78        tag.supports_reasoning_override = Some(self.supports_reasoning);
79        request.provider_params = Some(ProviderTag::OpenAi(tag));
80        request
81    }
82
83    fn map_send_error(error: reqwest::Error) -> LlmError {
84        if error.is_timeout() {
85            LlmError::NetworkTimeout { duration_ms: 30000 }
86        } else if Self::is_connection_error(&error) {
87            LlmError::ConnectionReset
88        } else {
89            LlmError::Unknown {
90                message: error.to_string(),
91            }
92        }
93    }
94
95    #[cfg(not(target_arch = "wasm32"))]
96    fn is_connection_error(error: &reqwest::Error) -> bool {
97        error.is_connect()
98    }
99
100    #[cfg(target_arch = "wasm32")]
101    fn is_connection_error(_error: &reqwest::Error) -> bool {
102        false
103    }
104
105    fn build_chat_completions_body(&self, request: &LlmRequest) -> Result<Value, LlmError> {
106        let mut body = serde_json::json!({
107            "model": self.remote_model,
108            "messages": Self::convert_to_chat_messages(&request.messages)?,
109            "stream": true,
110            "stream_options": { "include_usage": true },
111            "max_completion_tokens": request.max_tokens,
112        });
113
114        if self.supports_temperature
115            && let Some(temp) = request.temperature
116            && let Some(num) = serde_json::Number::from_f64(temp as f64)
117        {
118            body["temperature"] = Value::Number(num);
119        }
120
121        if !request.tools.is_empty() {
122            body["tools"] = Value::Array(
123                request
124                    .tools
125                    .iter()
126                    .map(|tool| {
127                        serde_json::json!({
128                            "type": "function",
129                            "function": {
130                                "name": tool.name,
131                                "description": tool.description,
132                                "parameters": tool.input_schema
133                            }
134                        })
135                    })
136                    .collect(),
137            );
138        }
139
140        if let Some(tag) = crate::client::openai_tag(request) {
141            use meerkat_core::lifecycle::run_primitive::ReasoningEffort as TypedReasoningEffort;
142            if self.supports_reasoning {
143                if let Some(reasoning) = tag.reasoning.as_ref() {
144                    let v = reasoning.as_value();
145                    if v.is_object() {
146                        body["reasoning"] = v;
147                    }
148                }
149                if let Some(effort) = tag.reasoning_effort {
150                    let s = match effort {
151                        TypedReasoningEffort::Low => "low",
152                        TypedReasoningEffort::Medium => "medium",
153                        TypedReasoningEffort::High => "high",
154                    };
155                    if !body["reasoning"].is_object() {
156                        body["reasoning"] = serde_json::json!({});
157                    }
158                    body["reasoning"]["effort"] = Value::String(s.to_string());
159                    body["reasoning_effort"] = Value::String(s.to_string());
160                }
161                if self.supports_thinking
162                    && let Some(chat_template_kwargs) = tag.chat_template_kwargs.as_ref()
163                {
164                    body["chat_template_kwargs"] = chat_template_kwargs.as_value();
165                }
166                if self.supports_thinking
167                    && let Some(thinking) = tag.thinking.as_ref()
168                {
169                    body["thinking"] = thinking.as_value();
170                }
171            }
172            if let Some(output_schema) = tag.structured_output.as_ref() {
173                let compiled =
174                    self.compile_schema(output_schema)
175                        .map_err(|e| LlmError::InvalidRequest {
176                            message: e.to_string(),
177                        })?;
178                body["response_format"] = serde_json::json!({
179                    "type": "json_schema",
180                    "json_schema": {
181                        "name": output_schema.name.as_deref().unwrap_or("output"),
182                        "schema": compiled.schema,
183                        "strict": output_schema.strict
184                    }
185                });
186            }
187        }
188
189        Ok(body)
190    }
191
192    fn convert_to_chat_messages(messages: &[Message]) -> Result<Vec<Value>, LlmError> {
193        let mut out = Vec::new();
194        for message in messages {
195            match message {
196                Message::System(system) => {
197                    out.push(serde_json::json!({
198                        "role": "system",
199                        "content": system.content
200                    }));
201                }
202                Message::SystemNotice(notice) => {
203                    out.push(serde_json::json!({
204                        "role": "user",
205                        "content": notice.rendered_text()
206                    }));
207                }
208                Message::User(user) => {
209                    if meerkat_core::has_non_text_content(&user.content) {
210                        let content: Vec<Value> = user
211                            .content
212                            .iter()
213                            .map(|block| match block {
214                                ContentBlock::Text { text } => serde_json::json!({
215                                    "type": "text",
216                                    "text": text
217                                }),
218                                ContentBlock::Image { media_type, data } => match data {
219                                    ImageData::Inline { data } => serde_json::json!({
220                                        "type": "image_url",
221                                        "image_url": {
222                                            "url": format!("data:{media_type};base64,{data}")
223                                        }
224                                    }),
225                                    ImageData::Blob { .. } => serde_json::json!({
226                                        "type": "text",
227                                        "text": block.text_projection()
228                                    }),
229                                },
230                                _ => serde_json::json!({
231                                    "type": "text",
232                                    "text": block.text_projection()
233                                }),
234                            })
235                            .collect();
236                        out.push(serde_json::json!({
237                            "role": "user",
238                            "content": content
239                        }));
240                    } else {
241                        out.push(serde_json::json!({
242                            "role": "user",
243                            "content": user.text_content()
244                        }));
245                    }
246                }
247                Message::Assistant(assistant) => {
248                    let tool_calls: Vec<Value> = assistant
249                        .tool_calls
250                        .iter()
251                        .map(|tool_call| {
252                            serde_json::json!({
253                                "id": tool_call.id,
254                                "type": "function",
255                                "function": {
256                                    "name": tool_call.name,
257                                    "arguments": tool_call.args.to_string(),
258                                }
259                            })
260                        })
261                        .collect();
262                    out.push(serde_json::json!({
263                        "role": "assistant",
264                        "content": if assistant.content.is_empty() {
265                            Value::Null
266                        } else {
267                            Value::String(assistant.content.clone())
268                        },
269                        "tool_calls": tool_calls
270                    }));
271                }
272                Message::BlockAssistant(assistant) => {
273                    let mut text_parts = Vec::new();
274                    let mut tool_calls = Vec::new();
275                    for block in &assistant.blocks {
276                        match block {
277                            AssistantBlock::Text { text, .. } => {
278                                if !text.is_empty() {
279                                    text_parts.push(text.clone());
280                                }
281                            }
282                            AssistantBlock::ToolUse { id, name, args, .. } => {
283                                tool_calls.push(serde_json::json!({
284                                    "id": id,
285                                    "type": "function",
286                                    "function": {
287                                        "name": name,
288                                        "arguments": args.get(),
289                                    }
290                                }));
291                            }
292                            _ => {}
293                        }
294                    }
295                    out.push(serde_json::json!({
296                        "role": "assistant",
297                        "content": if text_parts.is_empty() {
298                            Value::Null
299                        } else {
300                            Value::String(text_parts.join("\n"))
301                        },
302                        "tool_calls": tool_calls
303                    }));
304                }
305                Message::ToolResults { results, .. } => {
306                    for result in results {
307                        out.push(serde_json::json!({
308                            "role": "tool",
309                            "tool_call_id": result.tool_use_id,
310                            "content": result.text_content()
311                        }));
312                    }
313                }
314            }
315        }
316        Ok(out)
317    }
318
319    fn apply_auth(
320        &self,
321        request: reqwest::RequestBuilder,
322        content_type: &'static str,
323    ) -> reqwest::RequestBuilder {
324        let request = request.header("Content-Type", content_type);
325        if let Some(token) = &self.bearer_token {
326            request.header("Authorization", format!("Bearer {token}"))
327        } else {
328            request
329        }
330    }
331
332    fn parse_chat_completions_line(line: &str) -> Result<Option<ChatCompletionsChunk>, LlmError> {
333        if let Some(data) = line
334            .strip_prefix("data: ")
335            .or_else(|| line.strip_prefix("data:"))
336        {
337            if data == "[DONE]" {
338                return Ok(None);
339            }
340            serde_json::from_str(data)
341                .map(Some)
342                .map_err(|err| LlmError::StreamParseError {
343                    message: format!("failed to parse chat completions chunk: {err}; line={data}"),
344                })
345        } else {
346            Ok(None)
347        }
348    }
349}
350
351fn trim_v1_suffix(base_url: &str) -> String {
352    base_url
353        .trim_end_matches('/')
354        .trim_end_matches("/v1")
355        .to_string()
356}
357
358fn ensure_additional_properties_false(value: &mut Value) {
359    match value {
360        Value::Object(obj) => {
361            let is_object_type = match obj.get("type") {
362                Some(Value::String(t)) => t == "object",
363                Some(Value::Array(types)) => types.iter().any(|t| t.as_str() == Some("object")),
364                _ => obj.contains_key("properties") || obj.contains_key("required"),
365            };
366            if is_object_type && !obj.contains_key("additionalProperties") {
367                obj.insert("additionalProperties".to_string(), Value::Bool(false));
368            }
369            for child in obj.values_mut() {
370                ensure_additional_properties_false(child);
371            }
372        }
373        Value::Array(items) => {
374            for item in items.iter_mut() {
375                ensure_additional_properties_false(item);
376            }
377        }
378        _ => {}
379    }
380}
381
382#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
383#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
384impl LlmClient for OpenAiCompatibleClient {
385    fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
386        match self.mode {
387            OpenAiCompatibleMode::Responses => {
388                let Some(delegate) = self.responses_delegate.as_ref() else {
389                    let inner: LlmStream<'a> = Box::pin(futures::stream::once(async {
390                        Err(LlmError::InvalidRequest {
391                            message: "responses mode requires a configured delegate client"
392                                .to_string(),
393                        })
394                    }));
395                    return inner;
396                };
397                let inner: LlmStream<'a> = Box::pin(async_stream::try_stream! {
398                    let translated = self.request_with_remote_model(request);
399                    let mut stream = delegate.stream(&translated);
400                    while let Some(event) = stream.next().await {
401                        yield event?;
402                    }
403                });
404                streaming::ensure_terminal_done(inner)
405            }
406            OpenAiCompatibleMode::ChatCompletions => {
407                let inner: LlmStream<'a> = Box::pin(async_stream::try_stream! {
408                    let body = self.build_chat_completions_body(request)?;
409                    let response = self
410                        .apply_auth(
411                            self.http.post(format!("{}/chat/completions", self.base_url)),
412                            "application/json",
413                        )
414                        .json(&body)
415                        .send()
416                        .await
417                        .map_err(Self::map_send_error)?;
418
419                    let status_code = response.status().as_u16();
420                    let stream_result = if (200..=299).contains(&status_code) {
421                        Ok(response.bytes_stream())
422                    } else {
423                        let headers = response.headers().clone();
424                        let text = response.text().await.unwrap_or_default();
425                        Err(LlmError::from_http_response(status_code, text, &headers))
426                    };
427                    let mut stream = stream_result?;
428                    let mut buffer = String::with_capacity(512);
429                    let mut tool_buffers: HashMap<usize, ToolCallBuffer> = HashMap::new();
430                    let mut reasoning_text = String::new();
431                    let mut done_emitted = false;
432
433                    while let Some(chunk) = stream.next().await {
434                        let chunk = chunk.map_err(|_| LlmError::ConnectionReset)?;
435                        buffer.push_str(&String::from_utf8_lossy(&chunk));
436
437                        while let Some(newline_pos) = buffer.find('\n') {
438                            let line = buffer[..newline_pos].trim();
439                            let should_process = !line.is_empty() && !line.starts_with(':');
440                            let parsed = if should_process {
441                                Self::parse_chat_completions_line(line)
442                            } else {
443                                Ok(None)
444                            };
445                            buffer.drain(..=newline_pos);
446
447                            if let Some(event) = parsed? {
448                                if let Some(event_usage) = event.usage {
449                                    let usage = Usage {
450                                        input_tokens: event_usage.prompt_tokens.unwrap_or(0),
451                                        output_tokens: event_usage.completion_tokens.unwrap_or(0),
452                                        cache_creation_tokens: None,
453                                        cache_read_tokens: None,
454                                    };
455                                    yield LlmEvent::UsageUpdate { usage };
456                                }
457
458                                for choice in event.choices {
459                                    if let Some(delta) = choice.delta {
460                                        if let Some(content) = delta.content
461                                            && !content.is_empty()
462                                        {
463                                            yield LlmEvent::TextDelta {
464                                                delta: content,
465                                                meta: None,
466                                            };
467                                        }
468                                        let reasoning_delta = delta
469                                            .reasoning_content
470                                            .as_ref()
471                                            .or(delta.reasoning.as_ref())
472                                            .or(delta.thinking.as_ref());
473                                        if let Some(reasoning) = reasoning_delta
474                                            && !reasoning.is_empty()
475                                        {
476                                            reasoning_text.push_str(reasoning);
477                                            yield LlmEvent::ReasoningDelta {
478                                                delta: reasoning.clone(),
479                                            };
480                                        }
481                                        if let Some(tool_calls) = delta.tool_calls {
482                                            for tool_call in tool_calls {
483                                                let index = tool_call.index.unwrap_or(0);
484                                                let buffer = tool_buffers.entry(index).or_insert_with(|| {
485                                                    ToolCallBuffer::new(
486                                                        tool_call
487                                                            .id
488                                                            .clone()
489                                                            .unwrap_or_else(|| format!("tool_call_{index}")),
490                                                    )
491                                                });
492                                                if let Some(id) = tool_call.id
493                                                    && buffer.id.starts_with("tool_call_")
494                                                {
495                                                    buffer.id = id;
496                                                }
497                                                if let Some(function) = tool_call.function {
498                                                    if let Some(name) = function.name {
499                                                        buffer.name = Some(name);
500                                                    }
501                                                    if let Some(arguments) = function.arguments
502                                                        && !arguments.is_empty()
503                                                    {
504                                                        buffer.push_args(&arguments);
505                                                        yield LlmEvent::ToolCallDelta {
506                                                            id: buffer.id.clone(),
507                                                            name: buffer.name.clone(),
508                                                            args_delta: arguments,
509                                                        };
510                                                    }
511                                                }
512                                            }
513                                        }
514                                    }
515
516                                    if let Some(finish_reason) = choice.finish_reason {
517                                        let stop_reason = match finish_reason.as_str() {
518                                            "tool_calls" => StopReason::ToolUse,
519                                            "length" => StopReason::MaxTokens,
520                                            "content_filter" => StopReason::ContentFilter,
521                                            _ => StopReason::EndTurn,
522                                        };
523                                        if matches!(stop_reason, StopReason::ToolUse) {
524                                            for buffer in tool_buffers.values() {
525                                                if let Some(tool_call) = buffer.try_complete() {
526                                                    yield LlmEvent::ToolCallComplete {
527                                                        id: tool_call.id,
528                                                        name: tool_call.name,
529                                                        args: tool_call.args,
530                                                        meta: None,
531                                                    };
532                                                }
533                                            }
534                                        }
535                                        if !reasoning_text.is_empty() {
536                                            yield LlmEvent::ReasoningComplete {
537                                                text: std::mem::take(&mut reasoning_text),
538                                                meta: None,
539                                            };
540                                        }
541                                        if !done_emitted {
542                                            done_emitted = true;
543                                            yield LlmEvent::Done {
544                                                outcome: LlmDoneOutcome::Success { stop_reason },
545                                            };
546                                        }
547                                    }
548                                }
549                            }
550                        }
551                    }
552
553                    if !buffer.trim().is_empty() {
554                        Err::<(), _>(LlmError::IncompleteResponse {
555                            message: format!(
556                                "chat completions stream ended with an incomplete SSE buffer: {}",
557                                buffer.trim()
558                            ),
559                        })?;
560                    }
561                    if !reasoning_text.is_empty() {
562                        yield LlmEvent::ReasoningComplete {
563                            text: reasoning_text,
564                            meta: None,
565                        };
566                    }
567                    if !done_emitted {
568                        yield LlmEvent::Done {
569                            outcome: LlmDoneOutcome::Success {
570                                stop_reason: StopReason::EndTurn,
571                            },
572                        };
573                    }
574                });
575
576                streaming::ensure_terminal_done(inner)
577            }
578        }
579    }
580
581    fn provider(&self) -> &'static str {
582        "self_hosted"
583    }
584
585    async fn health_check(&self) -> Result<(), LlmError> {
586        let response = self
587            .apply_auth(
588                self.http.get(format!("{}/models", self.base_url)),
589                "application/json",
590            )
591            .send()
592            .await
593            .map_err(|e| LlmError::Unknown {
594                message: e.to_string(),
595            })?;
596        let status = response.status().as_u16();
597        if (200..=299).contains(&status) {
598            Ok(())
599        } else {
600            let headers = response.headers().clone();
601            let text = response.text().await.unwrap_or_default();
602            Err(LlmError::from_http_response(status, text, &headers))
603        }
604    }
605
606    fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
607        let mut schema = output_schema.schema.as_value().clone();
608        if output_schema.strict {
609            ensure_additional_properties_false(&mut schema);
610        }
611        Ok(CompiledSchema {
612            schema,
613            warnings: Vec::new(),
614        })
615    }
616}
617
618#[derive(Debug, Deserialize)]
619struct ChatCompletionsChunk {
620    choices: Vec<ChatChoice>,
621    #[serde(default)]
622    usage: Option<ChatUsage>,
623}
624
625#[derive(Debug, Deserialize)]
626struct ChatChoice {
627    #[serde(default)]
628    delta: Option<ChatDelta>,
629    #[serde(default)]
630    finish_reason: Option<String>,
631}
632
633#[derive(Debug, Deserialize)]
634struct ChatDelta {
635    #[serde(default)]
636    content: Option<String>,
637    #[serde(default)]
638    reasoning: Option<String>,
639    #[serde(default)]
640    reasoning_content: Option<String>,
641    #[serde(default)]
642    thinking: Option<String>,
643    #[serde(default)]
644    tool_calls: Option<Vec<ChatToolCallDelta>>,
645}
646
647#[derive(Debug, Deserialize)]
648struct ChatToolCallDelta {
649    #[serde(default)]
650    index: Option<usize>,
651    #[serde(default)]
652    id: Option<String>,
653    #[serde(default)]
654    function: Option<ChatFunctionDelta>,
655}
656
657#[derive(Debug, Deserialize)]
658struct ChatFunctionDelta {
659    #[serde(default)]
660    name: Option<String>,
661    #[serde(default)]
662    arguments: Option<String>,
663}
664
665#[derive(Debug, Deserialize)]
666struct ChatUsage {
667    #[serde(default)]
668    prompt_tokens: Option<u64>,
669    #[serde(default)]
670    completion_tokens: Option<u64>,
671}
672
673#[cfg(test)]
674#[allow(clippy::unwrap_used, clippy::expect_used)]
675mod tests {
676    use super::*;
677    use axum::{
678        Json, Router,
679        extract::{Request, State},
680        response::IntoResponse,
681        routing::post,
682    };
683    use meerkat_core::UserMessage;
684    use std::sync::{Arc, Mutex};
685    use tokio::net::TcpListener;
686
687    async fn chat_sse(State(payload): State<String>) -> impl IntoResponse {
688        ([("content-type", "text/event-stream")], payload)
689    }
690
691    #[derive(Clone)]
692    struct ResponsesStubState {
693        payload: String,
694        auth_headers: Arc<Mutex<Vec<Option<String>>>>,
695    }
696
697    async fn responses_sse(
698        State(state): State<ResponsesStubState>,
699        request: Request,
700    ) -> impl IntoResponse {
701        let auth = request
702            .headers()
703            .get("authorization")
704            .and_then(|value| value.to_str().ok())
705            .map(std::string::ToString::to_string);
706        state
707            .auth_headers
708            .lock()
709            .expect("auth header capture lock")
710            .push(auth);
711        ([("content-type", "text/event-stream")], state.payload)
712    }
713
714    async fn models() -> impl IntoResponse {
715        Json(serde_json::json!({"data": []}))
716    }
717
718    async fn spawn_chat_stub_server(payload: String) -> (String, tokio::task::JoinHandle<()>) {
719        let app = Router::new()
720            .route("/v1/chat/completions", post(chat_sse))
721            .route("/v1/models", axum::routing::get(models))
722            .with_state(payload);
723        let listener = TcpListener::bind("127.0.0.1:0")
724            .await
725            .expect("bind test server");
726        let addr = listener.local_addr().expect("local addr");
727        let handle = tokio::spawn(async move {
728            axum::serve(listener, app).await.expect("serve test server");
729        });
730        (format!("http://{addr}/v1"), handle)
731    }
732
733    async fn spawn_responses_stub_server(
734        payload: String,
735    ) -> (
736        String,
737        Arc<Mutex<Vec<Option<String>>>>,
738        tokio::task::JoinHandle<()>,
739    ) {
740        let auth_headers = Arc::new(Mutex::new(Vec::new()));
741        let app = Router::new()
742            .route("/v1/responses", post(responses_sse))
743            .route("/v1/models", axum::routing::get(models))
744            .with_state(ResponsesStubState {
745                payload,
746                auth_headers: Arc::clone(&auth_headers),
747            });
748        let listener = TcpListener::bind("127.0.0.1:0")
749            .await
750            .expect("bind test server");
751        let addr = listener.local_addr().expect("local addr");
752        let handle = tokio::spawn(async move {
753            axum::serve(listener, app).await.expect("serve test server");
754        });
755        (format!("http://{addr}/v1"), auth_headers, handle)
756    }
757
758    #[tokio::test]
759    async fn chat_completions_stream_accumulates_tool_calls() {
760        let payload = concat!(
761            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"function\":{\"name\":\"read_file\",\"arguments\":\"{\\\"path\\\":\"}}]}}]}\n\n",
762            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"\\\"/tmp/a\\\"}\"}}]}}]}\n\n",
763            "data: {\"choices\":[{\"finish_reason\":\"tool_calls\"}],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":4}}\n\n",
764            "data: [DONE]\n\n"
765        )
766        .to_string();
767        let (base_url, handle) = spawn_chat_stub_server(payload).await;
768        let client = OpenAiCompatibleClient::new(
769            OpenAiCompatibleMode::ChatCompletions,
770            "remote-model".to_string(),
771            base_url,
772            None,
773            true,
774            false,
775            false,
776        );
777        let request = LlmRequest::new(
778            "gemma-4-31b",
779            vec![Message::User(UserMessage::text("hello".to_string()))],
780        );
781
782        let events: Vec<_> = client.stream(&request).collect().await;
783        let mut saw_complete = false;
784        let mut saw_done = false;
785        for event in events {
786            let event = event.expect("event");
787            match event {
788                LlmEvent::ToolCallComplete { id, name, args, .. } => {
789                    saw_complete = true;
790                    assert_eq!(id, "call_1");
791                    assert_eq!(name, "read_file");
792                    assert_eq!(args["path"], "/tmp/a");
793                }
794                LlmEvent::Done { outcome } => {
795                    saw_done = true;
796                    assert!(matches!(
797                        outcome,
798                        LlmDoneOutcome::Success {
799                            stop_reason: StopReason::ToolUse
800                        }
801                    ));
802                }
803                _ => {}
804            }
805        }
806        assert!(saw_complete);
807        assert!(saw_done);
808        handle.abort();
809    }
810
811    #[tokio::test]
812    async fn chat_completions_stream_emits_reasoning_events() {
813        let payload = concat!(
814            "data: {\"choices\":[{\"delta\":{\"reasoning_content\":\"Let me think. \"}}]}\n\n",
815            "data: {\"choices\":[{\"delta\":{\"reasoning_content\":\"Need one more step.\"}}]}\n\n",
816            "data: {\"choices\":[{\"delta\":{\"content\":\"Final answer\"},\"finish_reason\":\"stop\"}]}\n\n",
817            "data: [DONE]\n\n"
818        )
819        .to_string();
820        let (base_url, handle) = spawn_chat_stub_server(payload).await;
821        let client = OpenAiCompatibleClient::new(
822            OpenAiCompatibleMode::ChatCompletions,
823            "remote-model".to_string(),
824            base_url,
825            None,
826            true,
827            true,
828            true,
829        );
830        let request = LlmRequest::new(
831            "gemma-4-31b",
832            vec![Message::User(UserMessage::text("hello".to_string()))],
833        );
834
835        let events: Vec<_> = client.stream(&request).collect().await;
836        let mut reasoning_deltas = Vec::new();
837        let mut reasoning_complete = None;
838        for event in events {
839            match event.expect("event") {
840                LlmEvent::ReasoningDelta { delta } => reasoning_deltas.push(delta),
841                LlmEvent::ReasoningComplete { text, .. } => reasoning_complete = Some(text),
842                _ => {}
843            }
844        }
845
846        assert_eq!(
847            reasoning_deltas,
848            vec![
849                "Let me think. ".to_string(),
850                "Need one more step.".to_string()
851            ]
852        );
853        assert_eq!(
854            reasoning_complete,
855            Some("Let me think. Need one more step.".to_string())
856        );
857        handle.abort();
858    }
859
860    #[test]
861    fn build_chat_completions_body_preserves_reasoning_overrides() {
862        let client = OpenAiCompatibleClient::new(
863            OpenAiCompatibleMode::ChatCompletions,
864            "remote-model".to_string(),
865            "http://localhost:11434/v1".to_string(),
866            None,
867            true,
868            true,
869            true,
870        );
871        let request = LlmRequest::new(
872            "gemma-4-31b",
873            vec![Message::User(UserMessage::text("hello".to_string()))],
874        )
875        .with_openai_tag_merge(|t| {
876            t.reasoning_effort =
877                Some(meerkat_core::lifecycle::run_primitive::ReasoningEffort::Medium);
878            t.chat_template_kwargs = Some(
879                meerkat_core::lifecycle::run_primitive::OpaqueProviderBody::from_value(
880                    &serde_json::json!({"enable_thinking": true}),
881                ),
882            );
883            t.thinking = Some(
884                meerkat_core::lifecycle::run_primitive::OpaqueProviderBody::from_value(
885                    &serde_json::json!({"type": "enabled"}),
886                ),
887            );
888        });
889
890        let body = client
891            .build_chat_completions_body(&request)
892            .expect("body should build");
893
894        assert_eq!(body["reasoning"]["effort"], "medium");
895        assert_eq!(body["reasoning_effort"], "medium");
896        assert_eq!(body["chat_template_kwargs"]["enable_thinking"], true);
897        assert_eq!(body["thinking"]["type"], "enabled");
898    }
899
900    #[tokio::test]
901    async fn responses_mode_uses_single_v1_prefix_and_omits_auth_when_unset() {
902        let payload = concat!(
903            "data: {\"type\":\"response.completed\",\"response\":{\"status\":\"completed\",\"output\":[{\"type\":\"message\",\"content\":[{\"type\":\"output_text\",\"text\":\"Hello\"}]}],\"usage\":{\"input_tokens\":10,\"output_tokens\":5}}}\n\n",
904            "data: {\"type\":\"response.done\",\"response\":{\"status\":\"completed\",\"output\":[],\"usage\":{\"input_tokens\":10,\"output_tokens\":5}}}\n\n"
905        )
906        .to_string();
907        let (base_url, auth_headers, handle) = spawn_responses_stub_server(payload).await;
908        let client = OpenAiCompatibleClient::new(
909            OpenAiCompatibleMode::Responses,
910            "gemma4:e2b".to_string(),
911            base_url,
912            None,
913            true,
914            true,
915            true,
916        );
917        let request = LlmRequest::new(
918            "gemma-4-e2b",
919            vec![Message::User(UserMessage::text("hello".to_string()))],
920        );
921
922        let events: Vec<_> = client.stream(&request).collect().await;
923        assert!(
924            events.iter().all(Result::is_ok),
925            "responses mode should succeed against a single /v1/responses endpoint"
926        );
927        let auth_headers = auth_headers.lock().expect("auth header capture lock");
928        assert_eq!(auth_headers.len(), 1);
929        assert_eq!(auth_headers[0], None);
930        handle.abort();
931    }
932
933    #[test]
934    fn request_with_remote_model_preserves_self_hosted_capabilities_for_delegate() {
935        let client = OpenAiCompatibleClient::new(
936            OpenAiCompatibleMode::Responses,
937            "gemma4:e2b".to_string(),
938            "http://localhost:11434/v1".to_string(),
939            None,
940            true,
941            true,
942            true,
943        );
944        let request = LlmRequest::new(
945            "gemma-4-e2b",
946            vec![Message::User(UserMessage::text("hello".to_string()))],
947        );
948
949        let translated = client.request_with_remote_model(&request);
950
951        assert_eq!(translated.model, "gemma4:e2b");
952        let tag = match translated.provider_params.as_ref() {
953            Some(meerkat_core::lifecycle::run_primitive::ProviderTag::OpenAi(t)) => t,
954            other => unreachable!("expected OpenAi variant, got {other:?}"),
955        };
956        assert_eq!(tag.supports_temperature_override, Some(true));
957        assert_eq!(tag.supports_reasoning_override, Some(true));
958    }
959
960    #[test]
961    fn parse_chat_completions_line_accepts_sse_data_without_space() {
962        let line = r#"data:{"choices":[{"delta":{"content":"Hello"}}]}"#;
963        let chunk =
964            OpenAiCompatibleClient::parse_chat_completions_line(line).expect("line should parse");
965        assert!(chunk.is_some());
966    }
967
968    #[test]
969    fn ensure_additional_properties_false_recurses_into_nested_objects() {
970        let mut value = serde_json::json!({
971            "type": "object",
972            "properties": {
973                "outer": {
974                    "type": "object",
975                    "properties": {
976                        "inner": {
977                            "type": "object",
978                            "properties": {}
979                        }
980                    }
981                }
982            }
983        });
984
985        ensure_additional_properties_false(&mut value);
986
987        assert_eq!(value["additionalProperties"], Value::Bool(false));
988        assert_eq!(
989            value["properties"]["outer"]["additionalProperties"],
990            Value::Bool(false)
991        );
992        assert_eq!(
993            value["properties"]["outer"]["properties"]["inner"]["additionalProperties"],
994            Value::Bool(false)
995        );
996    }
997}