Skip to main content

axon/
backend.rs

1//! AXON Backend — Multi-provider LLM API clients.
2//!
3//! Supports: Anthropic, OpenAI, Gemini, Kimi, GLM, OpenRouter, Ollama.
4//! Uses blocking HTTP (reqwest::blocking) — suitable for CLI.
5//!
6//! Three API families:
7//!   - Anthropic: Claude Messages API (system field + x-api-key header)
8//!   - Gemini: Google generateContent (systemInstruction + API key in URL)
9//!   - OpenAI-compatible: OpenAI, Kimi, GLM, OpenRouter, Ollama (Bearer auth + chat/completions)
10//!
11//! API key sourced from environment: <PROVIDER>_API_KEY.
12//!
13//! # §Fase 33.x.i — Retirement-in-progress
14//!
15//! This module is the LEGACY synchronous LLM-call surface. The
16//! production async path uses [`crate::backends::Backend`] +
17//! [`crate::backends::Registry`] instead. As of v1.25.0 (33.x.i):
18//!
19//!   - [`SUPPORTED_BACKENDS`] is now a `pub use` re-export of the
20//!     consolidated [`crate::backends::CANONICAL_PROVIDERS`] —
21//!     single source of truth + drift-gated.
22//!   - [`get_api_key`] delegates to
23//!     [`crate::backends::get_api_key`] for the canonical env-var
24//!     resolution (kept here as a thin shim that wraps the result
25//!     in the legacy [`BackendError`] shape so existing callers
26//!     don't need to change types).
27//!   - The synchronous [`call`] / [`call_multi`] / [`call_stream`]
28//!     / [`call_multi_stream`] functions are marked
29//!     `#[deprecated]`. New code MUST use the async `Backend`
30//!     trait via the [`crate::backends::Registry`] surface.
31//!
32//! The 4 caller files (runner.rs, axon_server.rs,
33//! resilient_backend.rs, tenant_secrets.rs) carry a top-level
34//! `#![allow(deprecated)]` while the deeper async migration of
35//! the synchronous CLI/server paths progresses under a
36//! followup sub-fase (Fase 33.x.i.2 — sync→async migration of the
37//! 4 callers, separate from this consolidation cycle).
38
39#![allow(deprecated)]
40
41use serde_json::{json, Value};
42use std::io::{BufRead, BufReader};
43
44const DEFAULT_MAX_TOKENS: u32 = 4096;
45
46// ── Provider specifications ─────────────────────────────────────────────────
47
48struct ProviderSpec {
49    env_var: &'static str,
50    base_url: &'static str,
51    default_model: &'static str,
52    api_family: ApiFamily,
53}
54
55#[derive(Clone, Copy, PartialEq)]
56enum ApiFamily {
57    Anthropic,
58    Gemini,
59    OpenAICompatible,
60}
61
62fn provider_spec(name: &str) -> Option<ProviderSpec> {
63    match name {
64        "anthropic" => Some(ProviderSpec {
65            env_var: "ANTHROPIC_API_KEY",
66            base_url: "https://api.anthropic.com",
67            default_model: "claude-sonnet-4-20250514",
68            api_family: ApiFamily::Anthropic,
69        }),
70        "openai" => Some(ProviderSpec {
71            env_var: "OPENAI_API_KEY",
72            base_url: "https://api.openai.com",
73            default_model: "gpt-4o-mini",
74            api_family: ApiFamily::OpenAICompatible,
75        }),
76        "gemini" => Some(ProviderSpec {
77            env_var: "GEMINI_API_KEY",
78            base_url: "https://generativelanguage.googleapis.com",
79            default_model: "gemini-2.0-flash",
80            api_family: ApiFamily::Gemini,
81        }),
82        "kimi" => Some(ProviderSpec {
83            env_var: "KIMI_API_KEY",
84            base_url: "https://api.moonshot.ai",
85            default_model: "moonshot-v1-8k",
86            api_family: ApiFamily::OpenAICompatible,
87        }),
88        "glm" => Some(ProviderSpec {
89            env_var: "GLM_API_KEY",
90            base_url: "https://open.bigmodel.cn/api/paas",
91            default_model: "glm-4-flash",
92            api_family: ApiFamily::OpenAICompatible,
93        }),
94        "openrouter" => Some(ProviderSpec {
95            env_var: "OPENROUTER_API_KEY",
96            base_url: "https://openrouter.ai/api",
97            default_model: "anthropic/claude-sonnet-4",
98            api_family: ApiFamily::OpenAICompatible,
99        }),
100        "ollama" => Some(ProviderSpec {
101            env_var: "OLLAMA_API_KEY",  // Optional for local
102            base_url: "http://localhost:11434",
103            default_model: "llama3.2",
104            api_family: ApiFamily::OpenAICompatible,
105        }),
106        _ => None,
107    }
108}
109
110/// §Fase 33.x.i — Re-export the consolidated single source of truth.
111///
112/// The canonical 7-provider list lives in
113/// [`crate::backends::CANONICAL_PROVIDERS`]; this re-export keeps
114/// existing call sites compiling without churn while the deeper
115/// async migration (Fase 33.x.i.2) progresses. Drift-gated by
116/// `backends::resolver_tests::canonical_providers_equals_legacy_supported_backends`
117/// + `tests/fase33x_i_mono_file_retirement.rs`.
118pub use crate::backends::CANONICAL_PROVIDERS as SUPPORTED_BACKENDS;
119
120// ── Public types ────────────────────────────────────────────────────────────
121
122/// Result of a single model call.
123#[derive(Debug)]
124pub struct ModelResponse {
125    pub text: String,
126    pub model: String,
127    pub input_tokens: u64,
128    pub output_tokens: u64,
129    pub stop_reason: String,
130}
131
132/// Error from backend API call.
133#[derive(Debug)]
134pub struct BackendError {
135    pub message: String,
136}
137
138impl std::fmt::Display for BackendError {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        write!(f, "{}", self.message)
141    }
142}
143
144// ── Public API ──────────────────────────────────────────────────────────────
145
146/// §Fase 33.x.i — Get the API key for a given backend from
147/// environment.
148///
149/// This is a thin shim around [`crate::backends::get_api_key`]
150/// (the consolidated source of truth) that wraps the result in
151/// the legacy [`BackendError`] shape so existing callers keep
152/// compiling without touching their error-handling code. New
153/// code should call [`crate::backends::get_api_key`] directly +
154/// use the trait-side error types.
155pub fn get_api_key(backend: &str) -> Result<String, BackendError> {
156    crate::backends::get_api_key(backend).map_err(|message| BackendError { message })
157}
158
159/// Call the LLM API for the given backend (blocking).
160/// Dispatches to the correct API family based on provider spec.
161///
162/// §Fase 33.x.i — DEPRECATED. New code MUST use the async
163/// [`crate::backends::Backend`] trait via
164/// [`crate::backends::Registry`]. The 4 caller files of this
165/// function carry `#![allow(deprecated)]` while the deeper async
166/// migration progresses under Fase 33.x.i.2.
167#[deprecated(
168    since = "1.25.0",
169    note = "use crate::backends::Registry::get(name)?.complete() instead; \
170            33.x.i.2 closes the 4-caller sync→async migration"
171)]
172pub fn call(
173    backend: &str,
174    api_key: &str,
175    system_prompt: &str,
176    user_prompt: &str,
177    max_tokens: Option<u32>,
178) -> Result<ModelResponse, BackendError> {
179    let spec = provider_spec(backend).ok_or_else(|| BackendError {
180        message: format!(
181            "Unknown backend '{backend}'. Supported: {}",
182            SUPPORTED_BACKENDS.join(", ")
183        ),
184    })?;
185
186    let start = std::time::Instant::now();
187    tracing::info!(
188        backend = backend,
189        model = spec.default_model,
190        max_tokens = max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
191        "llm_call_started"
192    );
193
194    let result = match spec.api_family {
195        ApiFamily::Anthropic => call_anthropic(&spec, api_key, system_prompt, user_prompt, max_tokens),
196        ApiFamily::Gemini => call_gemini(&spec, api_key, system_prompt, user_prompt, max_tokens),
197        ApiFamily::OpenAICompatible => call_openai_compat(&spec, api_key, system_prompt, user_prompt, max_tokens),
198    };
199
200    let latency_ms = start.elapsed().as_millis() as u64;
201
202    match &result {
203        Ok(resp) => {
204            tracing::info!(
205                backend = backend,
206                model = %resp.model,
207                latency_ms = latency_ms,
208                input_tokens = resp.input_tokens,
209                output_tokens = resp.output_tokens,
210                stop_reason = %resp.stop_reason,
211                "llm_call_completed"
212            );
213        }
214        Err(e) => {
215            tracing::error!(
216                backend = backend,
217                latency_ms = latency_ms,
218                error = %e,
219                "llm_call_failed"
220            );
221        }
222    }
223
224    result
225}
226
227/// Call the LLM API with streaming — text chunks arrive via `on_chunk` callback.
228/// Returns the complete `ModelResponse` after the stream ends.
229/// Anchor checking and other post-processing run on the accumulated text.
230/// §Fase 33.x.i — DEPRECATED single-message streaming call.
231/// New code MUST use [`crate::backends::Backend::stream`] instead.
232#[deprecated(
233    since = "1.25.0",
234    note = "use crate::backends::Registry::get(name)?.stream() instead; \
235            33.x.i.2 closes the sync→async migration"
236)]
237pub fn call_stream<F>(
238    backend: &str,
239    api_key: &str,
240    system_prompt: &str,
241    user_prompt: &str,
242    max_tokens: Option<u32>,
243    on_chunk: F,
244) -> Result<ModelResponse, BackendError>
245where
246    F: FnMut(&str),
247{
248    let spec = provider_spec(backend).ok_or_else(|| BackendError {
249        message: format!(
250            "Unknown backend '{backend}'. Supported: {}",
251            SUPPORTED_BACKENDS.join(", ")
252        ),
253    })?;
254
255    let start = std::time::Instant::now();
256    tracing::info!(
257        backend = backend,
258        model = spec.default_model,
259        streaming = true,
260        "llm_stream_started"
261    );
262
263    let result = match spec.api_family {
264        ApiFamily::Anthropic => stream_anthropic(&spec, api_key, system_prompt, user_prompt, max_tokens, on_chunk),
265        ApiFamily::Gemini => stream_gemini(&spec, api_key, system_prompt, user_prompt, max_tokens, on_chunk),
266        ApiFamily::OpenAICompatible => stream_openai_compat(&spec, api_key, system_prompt, user_prompt, max_tokens, on_chunk),
267    };
268
269    let latency_ms = start.elapsed().as_millis() as u64;
270
271    match &result {
272        Ok(resp) => {
273            tracing::info!(
274                backend = backend,
275                model = %resp.model,
276                latency_ms = latency_ms,
277                input_tokens = resp.input_tokens,
278                output_tokens = resp.output_tokens,
279                "llm_stream_completed"
280            );
281        }
282        Err(e) => {
283            tracing::error!(
284                backend = backend,
285                latency_ms = latency_ms,
286                error = %e,
287                "llm_stream_failed"
288            );
289        }
290    }
291
292    result
293}
294
295/// Call the LLM API with conversation history (blocking).
296/// The `messages` slice contains prior user/assistant turns. The current
297/// `user_prompt` is appended as the final user message.
298/// §Fase 33.x.i — DEPRECATED multi-turn synchronous call.
299/// New code MUST use [`crate::backends::Backend::complete`] with
300/// a populated `ChatRequest::messages` history.
301#[deprecated(
302    since = "1.25.0",
303    note = "use crate::backends::Registry::get(name)?.complete() with \
304            ChatRequest::messages history; 33.x.i.2 closes the migration"
305)]
306pub fn call_multi(
307    backend: &str,
308    api_key: &str,
309    system_prompt: &str,
310    messages: &[crate::conversation::Message],
311    user_prompt: &str,
312    max_tokens: Option<u32>,
313) -> Result<ModelResponse, BackendError> {
314    let spec = provider_spec(backend).ok_or_else(|| BackendError {
315        message: format!(
316            "Unknown backend '{backend}'. Supported: {}",
317            SUPPORTED_BACKENDS.join(", ")
318        ),
319    })?;
320
321    let msgs_json = build_messages_json(&spec, messages, user_prompt);
322
323    let start = std::time::Instant::now();
324    let turn_count = messages.len() + 1;
325    tracing::info!(
326        backend = backend,
327        model = spec.default_model,
328        turns = turn_count,
329        "llm_multi_call_started"
330    );
331
332    let result = match spec.api_family {
333        ApiFamily::Anthropic => call_anthropic_multi(&spec, api_key, system_prompt, &msgs_json, max_tokens),
334        ApiFamily::Gemini => call_gemini_multi(&spec, api_key, system_prompt, &msgs_json, max_tokens),
335        ApiFamily::OpenAICompatible => call_openai_multi(&spec, api_key, system_prompt, &msgs_json, max_tokens),
336    };
337
338    let latency_ms = start.elapsed().as_millis() as u64;
339
340    match &result {
341        Ok(resp) => {
342            tracing::info!(
343                backend = backend,
344                model = %resp.model,
345                turns = turn_count,
346                latency_ms = latency_ms,
347                input_tokens = resp.input_tokens,
348                output_tokens = resp.output_tokens,
349                "llm_multi_call_completed"
350            );
351        }
352        Err(e) => {
353            tracing::error!(
354                backend = backend,
355                turns = turn_count,
356                latency_ms = latency_ms,
357                error = %e,
358                "llm_multi_call_failed"
359            );
360        }
361    }
362
363    result
364}
365
366/// Call the LLM API with conversation history and streaming.
367/// §Fase 33.x.i — DEPRECATED multi-turn streaming call.
368/// New code MUST use [`crate::backends::Backend::stream`] with a
369/// populated `ChatRequest::messages` history.
370#[deprecated(
371    since = "1.25.0",
372    note = "use crate::backends::Registry::get(name)?.stream() with \
373            ChatRequest::messages history; 33.x.i.2 closes the migration"
374)]
375pub fn call_multi_stream<F>(
376    backend: &str,
377    api_key: &str,
378    system_prompt: &str,
379    messages: &[crate::conversation::Message],
380    user_prompt: &str,
381    max_tokens: Option<u32>,
382    on_chunk: F,
383) -> Result<ModelResponse, BackendError>
384where
385    F: FnMut(&str),
386{
387    let spec = provider_spec(backend).ok_or_else(|| BackendError {
388        message: format!(
389            "Unknown backend '{backend}'. Supported: {}",
390            SUPPORTED_BACKENDS.join(", ")
391        ),
392    })?;
393
394    let msgs_json = build_messages_json(&spec, messages, user_prompt);
395
396    let start = std::time::Instant::now();
397    let turn_count = messages.len() + 1;
398    tracing::info!(
399        backend = backend,
400        model = spec.default_model,
401        turns = turn_count,
402        streaming = true,
403        "llm_multi_stream_started"
404    );
405
406    let result = match spec.api_family {
407        ApiFamily::Anthropic => stream_anthropic_multi(&spec, api_key, system_prompt, &msgs_json, max_tokens, on_chunk),
408        ApiFamily::Gemini => stream_gemini_multi(&spec, api_key, system_prompt, &msgs_json, max_tokens, on_chunk),
409        ApiFamily::OpenAICompatible => stream_openai_multi(&spec, api_key, system_prompt, &msgs_json, max_tokens, on_chunk),
410    };
411
412    let latency_ms = start.elapsed().as_millis() as u64;
413
414    match &result {
415        Ok(resp) => {
416            tracing::info!(
417                backend = backend,
418                model = %resp.model,
419                turns = turn_count,
420                latency_ms = latency_ms,
421                input_tokens = resp.input_tokens,
422                output_tokens = resp.output_tokens,
423                "llm_multi_stream_completed"
424            );
425        }
426        Err(e) => {
427            tracing::error!(
428                backend = backend,
429                turns = turn_count,
430                latency_ms = latency_ms,
431                error = %e,
432                "llm_multi_stream_failed"
433            );
434        }
435    }
436
437    result
438}
439
440/// Build the messages JSON array from conversation history + current user prompt.
441/// Format depends on API family:
442///   - Anthropic/OpenAI: `[{role, content}, ...]`
443///   - Gemini: `[{role, parts: [{text}]}, ...]` with "model" instead of "assistant"
444fn build_messages_json(
445    spec: &ProviderSpec,
446    messages: &[crate::conversation::Message],
447    user_prompt: &str,
448) -> Vec<Value> {
449    let mut msgs: Vec<Value> = Vec::with_capacity(messages.len() + 1);
450
451    if spec.api_family == ApiFamily::Gemini {
452        for m in messages {
453            let role = if m.role == "assistant" { "model" } else { &m.role };
454            msgs.push(json!({"role": role, "parts": [{"text": &m.content}]}));
455        }
456        msgs.push(json!({"role": "user", "parts": [{"text": user_prompt}]}));
457    } else {
458        for m in messages {
459            msgs.push(json!({"role": &m.role, "content": &m.content}));
460        }
461        msgs.push(json!({"role": "user", "content": user_prompt}));
462    }
463
464    msgs
465}
466
467// ── SSE line parser ────────────────────────────────────────────────────────
468
469/// Parse SSE event stream from a reader, extracting text chunks.
470/// Calls `extract_text` on each JSON data line to get the text delta.
471/// Returns (accumulated_text, model, input_tokens, output_tokens, stop_reason).
472fn parse_sse_stream<R, F, E>(
473    reader: R,
474    mut on_chunk: F,
475    extract_text: E,
476) -> Result<(String, String, u64, u64, String), BackendError>
477where
478    R: std::io::Read,
479    F: FnMut(&str),
480    E: Fn(&Value) -> SseExtract,
481{
482    let buf = BufReader::new(reader);
483    let mut full_text = String::new();
484    let mut model = String::new();
485    let mut input_tokens: u64 = 0;
486    let mut output_tokens: u64 = 0;
487    let mut stop_reason = "unknown".to_string();
488
489    for line in buf.lines() {
490        let line = line.map_err(|e| BackendError {
491            message: format!("Stream read error: {e}"),
492        })?;
493
494        let line = line.trim_end();
495
496        // SSE format: "data: {...}"
497        if let Some(data) = line.strip_prefix("data: ") {
498            if data == "[DONE]" {
499                break;
500            }
501            if let Ok(json) = serde_json::from_str::<Value>(data) {
502                match extract_text(&json) {
503                    SseExtract::Text(text) => {
504                        on_chunk(&text);
505                        full_text.push_str(&text);
506                    }
507                    SseExtract::Meta { m, it, ot, sr } => {
508                        if !m.is_empty() { model = m; }
509                        if it > 0 { input_tokens = it; }
510                        if ot > 0 { output_tokens = ot; }
511                        if !sr.is_empty() { stop_reason = sr; }
512                    }
513                    SseExtract::None => {}
514                }
515            }
516        }
517    }
518
519    Ok((full_text, model, input_tokens, output_tokens, stop_reason))
520}
521
522/// What a single SSE data line yields.
523enum SseExtract {
524    /// A text chunk to append.
525    Text(String),
526    /// Metadata update (model, input_tokens, output_tokens, stop_reason).
527    Meta { m: String, it: u64, ot: u64, sr: String },
528    /// Nothing useful in this line.
529    None,
530}
531
532// ── Streaming: Anthropic ───────────────────────────────────────────────────
533
534fn stream_anthropic<F>(
535    spec: &ProviderSpec,
536    api_key: &str,
537    system_prompt: &str,
538    user_prompt: &str,
539    max_tokens: Option<u32>,
540    on_chunk: F,
541) -> Result<ModelResponse, BackendError>
542where
543    F: FnMut(&str),
544{
545    let url = format!("{}/v1/messages", spec.base_url);
546    let body = json!({
547        "model": spec.default_model,
548        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
549        "stream": true,
550        "system": system_prompt,
551        "messages": [{"role": "user", "content": user_prompt}]
552    });
553
554    let response = http_post_stream(
555        &url,
556        &[
557            ("x-api-key", api_key),
558            ("anthropic-version", "2023-06-01"),
559            ("content-type", "application/json"),
560        ],
561        &body,
562    )?;
563
564    let (text, model, input_tokens, output_tokens, stop_reason) =
565        parse_sse_stream(response, on_chunk, |json| {
566            let event_type = json["type"].as_str().unwrap_or("");
567            match event_type {
568                "content_block_delta" => {
569                    if let Some(text) = json["delta"]["text"].as_str() {
570                        SseExtract::Text(text.to_string())
571                    } else {
572                        SseExtract::None
573                    }
574                }
575                "message_start" => {
576                    let m = json["message"]["model"].as_str().unwrap_or("").to_string();
577                    let it = json["message"]["usage"]["input_tokens"].as_u64().unwrap_or(0);
578                    SseExtract::Meta { m, it, ot: 0, sr: String::new() }
579                }
580                "message_delta" => {
581                    let ot = json["usage"]["output_tokens"].as_u64().unwrap_or(0);
582                    let sr = json["delta"]["stop_reason"].as_str().unwrap_or("").to_string();
583                    SseExtract::Meta { m: String::new(), it: 0, ot, sr }
584                }
585                _ => SseExtract::None,
586            }
587        })?;
588
589    let model = if model.is_empty() { spec.default_model.to_string() } else { model };
590
591    Ok(ModelResponse { text, model, input_tokens, output_tokens, stop_reason })
592}
593
594// ── Streaming: Gemini ──────────────────────────────────────────────────────
595
596fn stream_gemini<F>(
597    spec: &ProviderSpec,
598    api_key: &str,
599    system_prompt: &str,
600    user_prompt: &str,
601    _max_tokens: Option<u32>,
602    mut on_chunk: F,
603) -> Result<ModelResponse, BackendError>
604where
605    F: FnMut(&str),
606{
607    // Gemini streaming uses streamGenerateContent endpoint
608    let url = format!(
609        "{}/v1beta/models/{}:streamGenerateContent?alt=sse&key={}",
610        spec.base_url, spec.default_model, api_key
611    );
612    let body = json!({
613        "systemInstruction": {"parts": [{"text": system_prompt}]},
614        "contents": [{"parts": [{"text": user_prompt}]}]
615    });
616
617    let response = http_post_stream(
618        &url,
619        &[("content-type", "application/json")],
620        &body,
621    )?;
622
623    let (text, _, input_tokens, output_tokens, stop_reason) =
624        parse_sse_stream(response, &mut on_chunk, |json| {
625            // Gemini SSE: each event is a generateContent response chunk
626            if let Some(text) = json["candidates"]
627                .as_array()
628                .and_then(|c| c.first())
629                .and_then(|c| c["content"]["parts"].as_array())
630                .and_then(|p| p.first())
631                .and_then(|p| p["text"].as_str())
632            {
633                return SseExtract::Text(text.to_string());
634            }
635
636            // Check for usage metadata in final chunk
637            let it = json["usageMetadata"]["promptTokenCount"].as_u64().unwrap_or(0);
638            let ot = json["usageMetadata"]["candidatesTokenCount"].as_u64().unwrap_or(0);
639            let sr = json["candidates"]
640                .as_array()
641                .and_then(|c| c.first())
642                .and_then(|c| c["finishReason"].as_str())
643                .unwrap_or("")
644                .to_string();
645
646            if it > 0 || ot > 0 || !sr.is_empty() {
647                SseExtract::Meta { m: String::new(), it, ot, sr }
648            } else {
649                SseExtract::None
650            }
651        })?;
652
653    Ok(ModelResponse {
654        text,
655        model: spec.default_model.to_string(),
656        input_tokens,
657        output_tokens,
658        stop_reason: if stop_reason == "unknown" { "STOP".to_string() } else { stop_reason },
659    })
660}
661
662// ── Streaming: OpenAI-compatible ───────────────────────────────────────────
663
664fn stream_openai_compat<F>(
665    spec: &ProviderSpec,
666    api_key: &str,
667    system_prompt: &str,
668    user_prompt: &str,
669    max_tokens: Option<u32>,
670    on_chunk: F,
671) -> Result<ModelResponse, BackendError>
672where
673    F: FnMut(&str),
674{
675    let url = format!("{}/v1/chat/completions", spec.base_url);
676    let body = json!({
677        "model": spec.default_model,
678        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
679        "temperature": 0,
680        "stream": true,
681        "messages": [
682            {"role": "system", "content": system_prompt},
683            {"role": "user", "content": user_prompt}
684        ]
685    });
686
687    let mut headers: Vec<(&str, &str)> = vec![("content-type", "application/json")];
688    let auth_header;
689    if !api_key.is_empty() {
690        auth_header = format!("Bearer {api_key}");
691        headers.push(("authorization", &auth_header));
692    }
693
694    let response = http_post_stream(&url, &headers, &body)?;
695
696    let (text, model, input_tokens, output_tokens, stop_reason) =
697        parse_sse_stream(response, on_chunk, |json| {
698            // OpenAI streaming: choices[0].delta.content
699            if let Some(text) = json["choices"]
700                .as_array()
701                .and_then(|c| c.first())
702                .and_then(|c| c["delta"]["content"].as_str())
703            {
704                if !text.is_empty() {
705                    return SseExtract::Text(text.to_string());
706                }
707            }
708
709            // Model name from first chunk
710            let m = json["model"].as_str().unwrap_or("").to_string();
711
712            // Usage in final chunk (OpenAI includes it with stream_options)
713            let it = json["usage"]["prompt_tokens"].as_u64().unwrap_or(0);
714            let ot = json["usage"]["completion_tokens"].as_u64().unwrap_or(0);
715
716            // Stop reason
717            let sr = json["choices"]
718                .as_array()
719                .and_then(|c| c.first())
720                .and_then(|c| c["finish_reason"].as_str())
721                .unwrap_or("")
722                .to_string();
723
724            if !m.is_empty() || it > 0 || ot > 0 || !sr.is_empty() {
725                SseExtract::Meta { m, it, ot, sr }
726            } else {
727                SseExtract::None
728            }
729        })?;
730
731    let model = if model.is_empty() { spec.default_model.to_string() } else { model };
732
733    Ok(ModelResponse { text, model, input_tokens, output_tokens, stop_reason })
734}
735
736// ── Anthropic Messages API ──────────────────────────────────────────────────
737
738fn call_anthropic(
739    spec: &ProviderSpec,
740    api_key: &str,
741    system_prompt: &str,
742    user_prompt: &str,
743    max_tokens: Option<u32>,
744) -> Result<ModelResponse, BackendError> {
745    let url = format!("{}/v1/messages", spec.base_url);
746    let body = json!({
747        "model": spec.default_model,
748        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
749        "system": system_prompt,
750        "messages": [{"role": "user", "content": user_prompt}]
751    });
752
753    let response = http_post(
754        &url,
755        &[
756            ("x-api-key", api_key),
757            ("anthropic-version", "2023-06-01"),
758            ("content-type", "application/json"),
759        ],
760        &body,
761    )?;
762
763    let text = response["content"]
764        .as_array()
765        .and_then(|blocks| {
766            blocks.iter()
767                .filter_map(|b| {
768                    if b["type"].as_str() == Some("text") {
769                        b["text"].as_str().map(|s| s.to_string())
770                    } else { None }
771                })
772                .reduce(|a, b| format!("{a}\n{b}"))
773        })
774        .unwrap_or_default();
775
776    Ok(ModelResponse {
777        text,
778        model: response["model"].as_str().unwrap_or(spec.default_model).to_string(),
779        input_tokens: response["usage"]["input_tokens"].as_u64().unwrap_or(0),
780        output_tokens: response["usage"]["output_tokens"].as_u64().unwrap_or(0),
781        stop_reason: response["stop_reason"].as_str().unwrap_or("unknown").to_string(),
782    })
783}
784
785// ── Gemini generateContent API ──────────────────────────────────────────────
786
787fn call_gemini(
788    spec: &ProviderSpec,
789    api_key: &str,
790    system_prompt: &str,
791    user_prompt: &str,
792    _max_tokens: Option<u32>,
793) -> Result<ModelResponse, BackendError> {
794    let url = format!(
795        "{}/v1beta/models/{}:generateContent?key={}",
796        spec.base_url, spec.default_model, api_key
797    );
798    let body = json!({
799        "systemInstruction": {"parts": [{"text": system_prompt}]},
800        "contents": [{"parts": [{"text": user_prompt}]}]
801    });
802
803    let response = http_post(
804        &url,
805        &[("content-type", "application/json")],
806        &body,
807    )?;
808
809    // Extract text from candidates[0].content.parts[0].text
810    let text = response["candidates"]
811        .as_array()
812        .and_then(|cands| cands.first())
813        .and_then(|c| c["content"]["parts"].as_array())
814        .and_then(|parts| parts.first())
815        .and_then(|p| p["text"].as_str())
816        .unwrap_or_default()
817        .to_string();
818
819    // Gemini usage is in usageMetadata
820    let input_tokens = response["usageMetadata"]["promptTokenCount"].as_u64().unwrap_or(0);
821    let output_tokens = response["usageMetadata"]["candidatesTokenCount"].as_u64().unwrap_or(0);
822    let stop_reason = response["candidates"]
823        .as_array()
824        .and_then(|c| c.first())
825        .and_then(|c| c["finishReason"].as_str())
826        .unwrap_or("unknown")
827        .to_string();
828
829    Ok(ModelResponse {
830        text,
831        model: spec.default_model.to_string(),
832        input_tokens,
833        output_tokens,
834        stop_reason,
835    })
836}
837
838// ── OpenAI-compatible chat/completions API ──────────────────────────────────
839// Covers: OpenAI, Kimi, GLM, OpenRouter, Ollama
840
841fn call_openai_compat(
842    spec: &ProviderSpec,
843    api_key: &str,
844    system_prompt: &str,
845    user_prompt: &str,
846    max_tokens: Option<u32>,
847) -> Result<ModelResponse, BackendError> {
848    let url = format!("{}/v1/chat/completions", spec.base_url);
849    let body = json!({
850        "model": spec.default_model,
851        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
852        "temperature": 0,
853        "messages": [
854            {"role": "system", "content": system_prompt},
855            {"role": "user", "content": user_prompt}
856        ]
857    });
858
859    let mut headers: Vec<(&str, &str)> = vec![("content-type", "application/json")];
860    let auth_header;
861    if !api_key.is_empty() {
862        auth_header = format!("Bearer {api_key}");
863        headers.push(("authorization", &auth_header));
864    }
865
866    let response = http_post(&url, &headers, &body)?;
867
868    // Extract text from choices[0].message.content
869    let text = response["choices"]
870        .as_array()
871        .and_then(|choices| choices.first())
872        .and_then(|c| c["message"]["content"].as_str())
873        .unwrap_or_default()
874        .to_string();
875
876    let model = response["model"].as_str().unwrap_or(spec.default_model).to_string();
877    let input_tokens = response["usage"]["prompt_tokens"].as_u64().unwrap_or(0);
878    let output_tokens = response["usage"]["completion_tokens"].as_u64().unwrap_or(0);
879    let stop_reason = response["choices"]
880        .as_array()
881        .and_then(|c| c.first())
882        .and_then(|c| c["finish_reason"].as_str())
883        .unwrap_or("unknown")
884        .to_string();
885
886    Ok(ModelResponse {
887        text,
888        model,
889        input_tokens,
890        output_tokens,
891        stop_reason,
892    })
893}
894
895// ── HTTP helper ─────────────────────────────────────────────────────────────
896
897/// HTTP POST returning the raw response for streaming reads.
898fn http_post_stream(
899    url: &str,
900    headers: &[(&str, &str)],
901    body: &Value,
902) -> Result<reqwest::blocking::Response, BackendError> {
903    let client = reqwest::blocking::Client::new();
904    let mut request = client.post(url);
905
906    for (key, val) in headers {
907        request = request.header(*key, *val);
908    }
909
910    tracing::debug!(url = url, "http_post_stream_sending");
911
912    let response = request
913        .json(body)
914        .send()
915        .map_err(|e| {
916            tracing::error!(url = url, error = %e, "http_post_stream_network_error");
917            BackendError {
918                message: format!("HTTP request failed: {e}"),
919            }
920        })?;
921
922    let status = response.status();
923    if !status.is_success() {
924        let error_text = response.text().unwrap_or_default();
925        tracing::error!(url = url, status = status.as_u16(), "http_post_stream_api_error");
926        return Err(BackendError {
927            message: format!("API error ({status}): {error_text}"),
928        });
929    }
930
931    Ok(response)
932}
933
934// ── Multi-turn: Anthropic ──────────────────────────────────────────────────
935
936fn call_anthropic_multi(
937    spec: &ProviderSpec,
938    api_key: &str,
939    system_prompt: &str,
940    messages: &[Value],
941    max_tokens: Option<u32>,
942) -> Result<ModelResponse, BackendError> {
943    let url = format!("{}/v1/messages", spec.base_url);
944    let body = json!({
945        "model": spec.default_model,
946        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
947        "system": system_prompt,
948        "messages": messages
949    });
950
951    let response = http_post(
952        &url,
953        &[
954            ("x-api-key", api_key),
955            ("anthropic-version", "2023-06-01"),
956            ("content-type", "application/json"),
957        ],
958        &body,
959    )?;
960
961    let text = response["content"]
962        .as_array()
963        .and_then(|blocks| {
964            blocks.iter()
965                .filter_map(|b| {
966                    if b["type"].as_str() == Some("text") {
967                        b["text"].as_str().map(|s| s.to_string())
968                    } else { None }
969                })
970                .reduce(|a, b| format!("{a}\n{b}"))
971        })
972        .unwrap_or_default();
973
974    Ok(ModelResponse {
975        text,
976        model: response["model"].as_str().unwrap_or(spec.default_model).to_string(),
977        input_tokens: response["usage"]["input_tokens"].as_u64().unwrap_or(0),
978        output_tokens: response["usage"]["output_tokens"].as_u64().unwrap_or(0),
979        stop_reason: response["stop_reason"].as_str().unwrap_or("unknown").to_string(),
980    })
981}
982
983fn stream_anthropic_multi<F>(
984    spec: &ProviderSpec,
985    api_key: &str,
986    system_prompt: &str,
987    messages: &[Value],
988    max_tokens: Option<u32>,
989    on_chunk: F,
990) -> Result<ModelResponse, BackendError>
991where
992    F: FnMut(&str),
993{
994    let url = format!("{}/v1/messages", spec.base_url);
995    let body = json!({
996        "model": spec.default_model,
997        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
998        "stream": true,
999        "system": system_prompt,
1000        "messages": messages
1001    });
1002
1003    let response = http_post_stream(
1004        &url,
1005        &[
1006            ("x-api-key", api_key),
1007            ("anthropic-version", "2023-06-01"),
1008            ("content-type", "application/json"),
1009        ],
1010        &body,
1011    )?;
1012
1013    let (text, model, input_tokens, output_tokens, stop_reason) =
1014        parse_sse_stream(response, on_chunk, |json| {
1015            let event_type = json["type"].as_str().unwrap_or("");
1016            match event_type {
1017                "content_block_delta" => {
1018                    if let Some(text) = json["delta"]["text"].as_str() {
1019                        SseExtract::Text(text.to_string())
1020                    } else {
1021                        SseExtract::None
1022                    }
1023                }
1024                "message_start" => {
1025                    let m = json["message"]["model"].as_str().unwrap_or("").to_string();
1026                    let it = json["message"]["usage"]["input_tokens"].as_u64().unwrap_or(0);
1027                    SseExtract::Meta { m, it, ot: 0, sr: String::new() }
1028                }
1029                "message_delta" => {
1030                    let ot = json["usage"]["output_tokens"].as_u64().unwrap_or(0);
1031                    let sr = json["delta"]["stop_reason"].as_str().unwrap_or("").to_string();
1032                    SseExtract::Meta { m: String::new(), it: 0, ot, sr }
1033                }
1034                _ => SseExtract::None,
1035            }
1036        })?;
1037
1038    let model = if model.is_empty() { spec.default_model.to_string() } else { model };
1039    Ok(ModelResponse { text, model, input_tokens, output_tokens, stop_reason })
1040}
1041
1042// ── Multi-turn: Gemini ────────────────────────────────────────────────────
1043
1044fn call_gemini_multi(
1045    spec: &ProviderSpec,
1046    api_key: &str,
1047    system_prompt: &str,
1048    contents: &[Value],
1049    _max_tokens: Option<u32>,
1050) -> Result<ModelResponse, BackendError> {
1051    let url = format!(
1052        "{}/v1beta/models/{}:generateContent?key={}",
1053        spec.base_url, spec.default_model, api_key
1054    );
1055    let body = json!({
1056        "systemInstruction": {"parts": [{"text": system_prompt}]},
1057        "contents": contents
1058    });
1059
1060    let response = http_post(
1061        &url,
1062        &[("content-type", "application/json")],
1063        &body,
1064    )?;
1065
1066    let text = response["candidates"]
1067        .as_array()
1068        .and_then(|cands| cands.first())
1069        .and_then(|c| c["content"]["parts"].as_array())
1070        .and_then(|parts| parts.first())
1071        .and_then(|p| p["text"].as_str())
1072        .unwrap_or_default()
1073        .to_string();
1074
1075    let input_tokens = response["usageMetadata"]["promptTokenCount"].as_u64().unwrap_or(0);
1076    let output_tokens = response["usageMetadata"]["candidatesTokenCount"].as_u64().unwrap_or(0);
1077    let stop_reason = response["candidates"]
1078        .as_array()
1079        .and_then(|c| c.first())
1080        .and_then(|c| c["finishReason"].as_str())
1081        .unwrap_or("unknown")
1082        .to_string();
1083
1084    Ok(ModelResponse {
1085        text,
1086        model: spec.default_model.to_string(),
1087        input_tokens,
1088        output_tokens,
1089        stop_reason,
1090    })
1091}
1092
1093fn stream_gemini_multi<F>(
1094    spec: &ProviderSpec,
1095    api_key: &str,
1096    system_prompt: &str,
1097    contents: &[Value],
1098    _max_tokens: Option<u32>,
1099    mut on_chunk: F,
1100) -> Result<ModelResponse, BackendError>
1101where
1102    F: FnMut(&str),
1103{
1104    let url = format!(
1105        "{}/v1beta/models/{}:streamGenerateContent?alt=sse&key={}",
1106        spec.base_url, spec.default_model, api_key
1107    );
1108    let body = json!({
1109        "systemInstruction": {"parts": [{"text": system_prompt}]},
1110        "contents": contents
1111    });
1112
1113    let response = http_post_stream(
1114        &url,
1115        &[("content-type", "application/json")],
1116        &body,
1117    )?;
1118
1119    let (text, _model, input_tokens, output_tokens, stop_reason) =
1120        parse_sse_stream(response, &mut on_chunk, |json| {
1121            if let Some(text) = json["candidates"]
1122                .as_array()
1123                .and_then(|c| c.first())
1124                .and_then(|c| c["content"]["parts"].as_array())
1125                .and_then(|parts| parts.first())
1126                .and_then(|p| p["text"].as_str())
1127            {
1128                SseExtract::Text(text.to_string())
1129            } else if let Some(usage) = json.get("usageMetadata") {
1130                SseExtract::Meta {
1131                    m: String::new(),
1132                    it: usage["promptTokenCount"].as_u64().unwrap_or(0),
1133                    ot: usage["candidatesTokenCount"].as_u64().unwrap_or(0),
1134                    sr: json["candidates"]
1135                        .as_array()
1136                        .and_then(|c| c.first())
1137                        .and_then(|c| c["finishReason"].as_str())
1138                        .unwrap_or("")
1139                        .to_string(),
1140                }
1141            } else {
1142                SseExtract::None
1143            }
1144        })?;
1145
1146    Ok(ModelResponse {
1147        text,
1148        model: spec.default_model.to_string(),
1149        input_tokens,
1150        output_tokens,
1151        stop_reason,
1152    })
1153}
1154
1155// ── Multi-turn: OpenAI-compatible ─────────────────────────────────────────
1156
1157fn call_openai_multi(
1158    spec: &ProviderSpec,
1159    api_key: &str,
1160    system_prompt: &str,
1161    messages: &[Value],
1162    max_tokens: Option<u32>,
1163) -> Result<ModelResponse, BackendError> {
1164    let url = format!("{}/v1/chat/completions", spec.base_url);
1165
1166    // Prepend system message to the conversation messages
1167    let mut all_msgs = vec![json!({"role": "system", "content": system_prompt})];
1168    all_msgs.extend_from_slice(messages);
1169
1170    let body = json!({
1171        "model": spec.default_model,
1172        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
1173        "temperature": 0,
1174        "messages": all_msgs
1175    });
1176
1177    let response = http_post(
1178        &url,
1179        &[
1180            ("Authorization", &format!("Bearer {api_key}")),
1181            ("content-type", "application/json"),
1182        ],
1183        &body,
1184    )?;
1185
1186    let text = response["choices"]
1187        .as_array()
1188        .and_then(|c| c.first())
1189        .and_then(|c| c["message"]["content"].as_str())
1190        .unwrap_or_default()
1191        .to_string();
1192
1193    let model = response["model"].as_str().unwrap_or(spec.default_model).to_string();
1194    let input_tokens = response["usage"]["prompt_tokens"].as_u64().unwrap_or(0);
1195    let output_tokens = response["usage"]["completion_tokens"].as_u64().unwrap_or(0);
1196    let stop_reason = response["choices"]
1197        .as_array()
1198        .and_then(|c| c.first())
1199        .and_then(|c| c["finish_reason"].as_str())
1200        .unwrap_or("unknown")
1201        .to_string();
1202
1203    Ok(ModelResponse { text, model, input_tokens, output_tokens, stop_reason })
1204}
1205
1206fn stream_openai_multi<F>(
1207    spec: &ProviderSpec,
1208    api_key: &str,
1209    system_prompt: &str,
1210    messages: &[Value],
1211    max_tokens: Option<u32>,
1212    on_chunk: F,
1213) -> Result<ModelResponse, BackendError>
1214where
1215    F: FnMut(&str),
1216{
1217    let url = format!("{}/v1/chat/completions", spec.base_url);
1218
1219    let mut all_msgs = vec![json!({"role": "system", "content": system_prompt})];
1220    all_msgs.extend_from_slice(messages);
1221
1222    let body = json!({
1223        "model": spec.default_model,
1224        "max_tokens": max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
1225        "temperature": 0,
1226        "stream": true,
1227        "messages": all_msgs
1228    });
1229
1230    let response = http_post_stream(
1231        &url,
1232        &[
1233            ("Authorization", &format!("Bearer {api_key}")),
1234            ("content-type", "application/json"),
1235        ],
1236        &body,
1237    )?;
1238
1239    let (text, model, input_tokens, output_tokens, stop_reason) =
1240        parse_sse_stream(response, on_chunk, |json| {
1241            if let Some(delta) = json["choices"]
1242                .as_array()
1243                .and_then(|c| c.first())
1244                .and_then(|c| c["delta"]["content"].as_str())
1245            {
1246                SseExtract::Text(delta.to_string())
1247            } else {
1248                let m = json["model"].as_str().unwrap_or("").to_string();
1249                let sr = json["choices"]
1250                    .as_array()
1251                    .and_then(|c| c.first())
1252                    .and_then(|c| c["finish_reason"].as_str())
1253                    .unwrap_or("")
1254                    .to_string();
1255                if !m.is_empty() || !sr.is_empty() {
1256                    SseExtract::Meta { m, it: 0, ot: 0, sr }
1257                } else {
1258                    SseExtract::None
1259                }
1260            }
1261        })?;
1262
1263    let model = if model.is_empty() { spec.default_model.to_string() } else { model };
1264    Ok(ModelResponse { text, model, input_tokens, output_tokens, stop_reason })
1265}
1266
1267fn http_post(url: &str, headers: &[(&str, &str)], body: &Value) -> Result<Value, BackendError> {
1268    let client = reqwest::blocking::Client::new();
1269    let mut request = client.post(url);
1270
1271    for (key, val) in headers {
1272        request = request.header(*key, *val);
1273    }
1274
1275    tracing::debug!(url = url, "http_post_sending");
1276
1277    let response = request
1278        .json(body)
1279        .send()
1280        .map_err(|e| {
1281            tracing::error!(url = url, error = %e, "http_post_network_error");
1282            BackendError {
1283                message: format!("HTTP request failed: {e}"),
1284            }
1285        })?;
1286
1287    let status = response.status();
1288    let response_text = response.text().map_err(|e| BackendError {
1289        message: format!("Failed to read response: {e}"),
1290    })?;
1291
1292    if !status.is_success() {
1293        tracing::error!(url = url, status = status.as_u16(), "http_post_api_error");
1294        return Err(BackendError {
1295            message: format!("API error ({status}): {response_text}"),
1296        });
1297    }
1298
1299    serde_json::from_str(&response_text).map_err(|e| BackendError {
1300        message: format!("Failed to parse response JSON: {e}"),
1301    })
1302}
1303
1304// ── Tests ──────────────────────────────────────────────────────────────────
1305
1306#[cfg(test)]
1307mod tests {
1308    use super::*;
1309
1310    #[test]
1311    fn sse_parse_anthropic_stream() {
1312        // Simulate Anthropic SSE stream
1313        let stream = b"\
1314data: {\"type\":\"message_start\",\"message\":{\"model\":\"claude-sonnet-4-20250514\",\"usage\":{\"input_tokens\":42}}}\n\
1315\n\
1316data: {\"type\":\"content_block_delta\",\"delta\":{\"text\":\"Hello\"}}\n\
1317\n\
1318data: {\"type\":\"content_block_delta\",\"delta\":{\"text\":\" world\"}}\n\
1319\n\
1320data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":10}}\n\
1321\n\
1322";
1323        let reader = std::io::Cursor::new(stream);
1324        let mut chunks: Vec<String> = Vec::new();
1325
1326        let (text, model, it, ot, sr) = parse_sse_stream(
1327            reader,
1328            |chunk| chunks.push(chunk.to_string()),
1329            |json| {
1330                let event_type = json["type"].as_str().unwrap_or("");
1331                match event_type {
1332                    "content_block_delta" => {
1333                        if let Some(t) = json["delta"]["text"].as_str() {
1334                            SseExtract::Text(t.to_string())
1335                        } else { SseExtract::None }
1336                    }
1337                    "message_start" => {
1338                        let m = json["message"]["model"].as_str().unwrap_or("").to_string();
1339                        let it = json["message"]["usage"]["input_tokens"].as_u64().unwrap_or(0);
1340                        SseExtract::Meta { m, it, ot: 0, sr: String::new() }
1341                    }
1342                    "message_delta" => {
1343                        let ot = json["usage"]["output_tokens"].as_u64().unwrap_or(0);
1344                        let sr = json["delta"]["stop_reason"].as_str().unwrap_or("").to_string();
1345                        SseExtract::Meta { m: String::new(), it: 0, ot, sr }
1346                    }
1347                    _ => SseExtract::None,
1348                }
1349            },
1350        ).unwrap();
1351
1352        assert_eq!(text, "Hello world");
1353        assert_eq!(chunks, vec!["Hello", " world"]);
1354        assert_eq!(model, "claude-sonnet-4-20250514");
1355        assert_eq!(it, 42);
1356        assert_eq!(ot, 10);
1357        assert_eq!(sr, "end_turn");
1358    }
1359
1360    #[test]
1361    fn sse_parse_openai_stream() {
1362        // Simulate OpenAI SSE stream (realistic: first chunk has model + role, then content chunks)
1363        let stream = b"\
1364data: {\"model\":\"gpt-4o-mini\",\"choices\":[{\"delta\":{\"role\":\"assistant\"}}]}\n\
1365\n\
1366data: {\"choices\":[{\"delta\":{\"content\":\"Hi\"}}]}\n\
1367\n\
1368data: {\"choices\":[{\"delta\":{\"content\":\" there\"}}]}\n\
1369\n\
1370data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":5,\"completion_tokens\":2}}\n\
1371\n\
1372data: [DONE]\n\
1373";
1374        let reader = std::io::Cursor::new(stream);
1375        let mut chunks: Vec<String> = Vec::new();
1376
1377        let (text, model, it, ot, sr) = parse_sse_stream(
1378            reader,
1379            |chunk| chunks.push(chunk.to_string()),
1380            |json| {
1381                if let Some(t) = json["choices"].as_array()
1382                    .and_then(|c| c.first())
1383                    .and_then(|c| c["delta"]["content"].as_str())
1384                {
1385                    if !t.is_empty() { return SseExtract::Text(t.to_string()); }
1386                }
1387                let m = json["model"].as_str().unwrap_or("").to_string();
1388                let it = json["usage"]["prompt_tokens"].as_u64().unwrap_or(0);
1389                let ot = json["usage"]["completion_tokens"].as_u64().unwrap_or(0);
1390                let sr = json["choices"].as_array()
1391                    .and_then(|c| c.first())
1392                    .and_then(|c| c["finish_reason"].as_str())
1393                    .unwrap_or("").to_string();
1394                if !m.is_empty() || it > 0 || ot > 0 || !sr.is_empty() {
1395                    SseExtract::Meta { m, it, ot, sr }
1396                } else { SseExtract::None }
1397            },
1398        ).unwrap();
1399
1400        assert_eq!(text, "Hi there");
1401        assert_eq!(chunks, vec!["Hi", " there"]);
1402        assert_eq!(model, "gpt-4o-mini");
1403        assert_eq!(it, 5);
1404        assert_eq!(ot, 2);
1405        assert_eq!(sr, "stop");
1406    }
1407
1408    #[test]
1409    fn sse_parse_empty_stream() {
1410        let stream = b"data: [DONE]\n";
1411        let reader = std::io::Cursor::new(stream);
1412        let mut chunk_count = 0;
1413
1414        let (text, _, _, _, _) = parse_sse_stream(
1415            reader,
1416            |_| chunk_count += 1,
1417            |_| SseExtract::None,
1418        ).unwrap();
1419
1420        assert_eq!(text, "");
1421        assert_eq!(chunk_count, 0);
1422    }
1423
1424    #[test]
1425    fn sse_parse_ignores_non_data_lines() {
1426        let stream = b"\
1427: comment line\n\
1428event: ping\n\
1429data: {\"type\":\"content_block_delta\",\"delta\":{\"text\":\"ok\"}}\n\
1430\n\
1431retry: 1000\n\
1432";
1433        let reader = std::io::Cursor::new(stream);
1434        let mut chunks: Vec<String> = Vec::new();
1435
1436        let (text, _, _, _, _) = parse_sse_stream(
1437            reader,
1438            |chunk| chunks.push(chunk.to_string()),
1439            |json| {
1440                if let Some(t) = json["delta"]["text"].as_str() {
1441                    SseExtract::Text(t.to_string())
1442                } else { SseExtract::None }
1443            },
1444        ).unwrap();
1445
1446        assert_eq!(text, "ok");
1447        assert_eq!(chunks, vec!["ok"]);
1448    }
1449}