Skip to main content

reddb_server/
ai.rs

1//! External AI provider integration primitives.
2//!
3//! This module currently supports OpenAI embeddings and is intended to be
4//! consumed by server handlers and future query/runtime integrations.
5
6use std::io::BufRead;
7use std::time::Duration;
8
9use crate::json::{parse_json, Map, Value as JsonValue};
10use crate::{RedDBError, RedDBResult};
11
12/// Shared HTTP helper for every outbound AI provider call. Centralises
13/// the ureq 3.x builder and error conversion. Returns
14/// `(status, body)` even on 4xx/5xx (via
15/// `http_status_as_error(false)`) so callers can format a
16/// provider-specific error without re-plumbing the 3.x error enum.
17fn http_post_json(
18    url: &str,
19    api_key: &str,
20    extra_headers: &[(&str, &str)],
21    payload: String,
22    read_timeout_secs: u64,
23) -> Result<(u16, String), String> {
24    let agent: ureq::Agent = ureq::Agent::config_builder()
25        .timeout_connect(Some(Duration::from_secs(10)))
26        .timeout_send_request(Some(Duration::from_secs(30)))
27        .timeout_recv_response(Some(Duration::from_secs(read_timeout_secs)))
28        .timeout_recv_body(Some(Duration::from_secs(read_timeout_secs)))
29        .http_status_as_error(false)
30        .build()
31        .into();
32
33    let mut req = agent
34        .post(url)
35        .header("Content-Type", "application/json")
36        .header("Accept", "application/json");
37    for (k, v) in extra_headers {
38        req = req.header(*k, *v);
39    }
40    let trimmed_key = api_key.trim();
41    if !trimmed_key.is_empty() {
42        req = req.header("Authorization", &format!("Bearer {}", trimmed_key));
43    }
44
45    match req.send(payload) {
46        Ok(mut resp) => {
47            let status = resp.status().as_u16();
48            let body = resp
49                .body_mut()
50                .read_to_string()
51                .map_err(|err| format!("failed to read response body: {err}"))?;
52            Ok((status, body))
53        }
54        Err(err) => Err(format!("{err}")),
55    }
56}
57
58pub const DEFAULT_OPENAI_EMBEDDING_MODEL: &str = "text-embedding-3-small";
59pub const DEFAULT_OPENAI_API_BASE: &str = "https://api.openai.com/v1";
60pub const DEFAULT_OPENAI_PROMPT_MODEL: &str = "gpt-4.1-mini";
61pub const DEFAULT_ANTHROPIC_PROMPT_MODEL: &str = "claude-3-5-haiku-latest";
62pub const DEFAULT_ANTHROPIC_API_BASE: &str = "https://api.anthropic.com/v1";
63pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01";
64
65#[derive(Debug, Clone)]
66pub struct OpenAiEmbeddingRequest {
67    pub api_key: String,
68    pub model: String,
69    pub inputs: Vec<String>,
70    pub dimensions: Option<usize>,
71    pub api_base: String,
72}
73
74#[derive(Debug, Clone)]
75pub struct OpenAiEmbeddingResponse {
76    pub provider: &'static str,
77    pub model: String,
78    pub embeddings: Vec<Vec<f32>>,
79    pub prompt_tokens: Option<u64>,
80    pub total_tokens: Option<u64>,
81}
82
83#[derive(Debug, Clone)]
84pub struct OpenAiPromptRequest {
85    pub api_key: String,
86    pub model: String,
87    pub prompt: String,
88    pub temperature: Option<f32>,
89    pub seed: Option<u64>,
90    pub max_output_tokens: Option<usize>,
91    pub api_base: String,
92    pub stream: bool,
93}
94
95#[derive(Debug, Clone)]
96pub struct AnthropicPromptRequest {
97    pub api_key: String,
98    pub model: String,
99    pub prompt: String,
100    pub temperature: Option<f32>,
101    pub max_output_tokens: Option<usize>,
102    pub api_base: String,
103    pub anthropic_version: String,
104}
105
106#[derive(Debug, Clone)]
107pub struct AiPromptResponse {
108    pub provider: &'static str,
109    pub model: String,
110    pub output_text: String,
111    pub output_chunks: Option<Vec<String>>,
112    pub prompt_tokens: Option<u64>,
113    pub completion_tokens: Option<u64>,
114    pub total_tokens: Option<u64>,
115    pub stop_reason: Option<String>,
116}
117
118#[deprecated(
119    since = "1.0.0",
120    note = "use AiBatchClient::embed_batch for embeddings or openai_embeddings_async with AiTransport when token usage metadata is required"
121)]
122pub fn openai_embeddings(request: OpenAiEmbeddingRequest) -> RedDBResult<OpenAiEmbeddingResponse> {
123    if request.model.trim().is_empty() {
124        return Err(RedDBError::Query(
125            "OpenAI embedding model cannot be empty".to_string(),
126        ));
127    }
128    if request.inputs.is_empty() {
129        return Err(RedDBError::Query(
130            "at least one input is required for embeddings".to_string(),
131        ));
132    }
133
134    let url = format!("{}/embeddings", request.api_base.trim_end_matches('/'));
135    let payload =
136        build_openai_embedding_payload(&request.model, &request.inputs, request.dimensions);
137
138    let (status, body) = http_post_json(&url, &request.api_key, &[], payload, 90)
139        .map_err(|err| RedDBError::Query(format!("OpenAI transport error: {err}")))?;
140
141    if !(200..300).contains(&status) {
142        let message = openai_error_message(&body)
143            .unwrap_or_else(|| "OpenAI embeddings request failed".to_string());
144        return Err(RedDBError::Query(format!(
145            "OpenAI embeddings request failed (status {status}): {message}"
146        )));
147    }
148
149    parse_openai_embedding_response(&body)
150}
151
152#[deprecated(since = "1.0.0", note = "use openai_prompt_async with AiTransport")]
153pub fn openai_prompt(request: OpenAiPromptRequest) -> RedDBResult<AiPromptResponse> {
154    if request.model.trim().is_empty() {
155        return Err(RedDBError::Query(
156            "OpenAI prompt model cannot be empty".to_string(),
157        ));
158    }
159    if request.prompt.trim().is_empty() {
160        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
161    }
162
163    let url = format!(
164        "{}/chat/completions",
165        request.api_base.trim_end_matches('/')
166    );
167    let payload = build_openai_prompt_payload(
168        &request.model,
169        &request.prompt,
170        request.temperature,
171        request.seed,
172        request.max_output_tokens,
173        false,
174    );
175
176    let (status, body) = http_post_json(&url, &request.api_key, &[], payload, 120)
177        .map_err(|err| RedDBError::Query(format!("OpenAI transport error: {err}")))?;
178
179    if !(200..300).contains(&status) {
180        let message = openai_error_message(&body)
181            .unwrap_or_else(|| "OpenAI prompt request failed".to_string());
182        return Err(RedDBError::Query(format!(
183            "OpenAI prompt request failed (status {status}): {message}"
184        )));
185    }
186
187    parse_openai_prompt_response(&body, &request.model)
188}
189
190#[deprecated(since = "1.0.0", note = "use anthropic_prompt_async with AiTransport")]
191pub fn anthropic_prompt(request: AnthropicPromptRequest) -> RedDBResult<AiPromptResponse> {
192    if request.api_key.trim().is_empty() {
193        return Err(RedDBError::Query(
194            "Anthropic API key cannot be empty".to_string(),
195        ));
196    }
197    if request.model.trim().is_empty() {
198        return Err(RedDBError::Query(
199            "Anthropic model cannot be empty".to_string(),
200        ));
201    }
202    if request.prompt.trim().is_empty() {
203        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
204    }
205
206    let url = format!("{}/messages", request.api_base.trim_end_matches('/'));
207    let payload = build_anthropic_prompt_payload(
208        &request.model,
209        &request.prompt,
210        request.temperature,
211        request.max_output_tokens,
212    );
213
214    // Anthropic uses its own `x-api-key` header instead of
215    // `Authorization: Bearer`, so skip the shared helper's default
216    // auth header path — we pass an empty API key and set
217    // `x-api-key` via extra_headers instead.
218    let extra = [
219        ("x-api-key", request.api_key.as_str()),
220        ("anthropic-version", request.anthropic_version.as_str()),
221    ];
222    let (status, body) = http_post_json(&url, "", &extra, payload, 120)
223        .map_err(|err| RedDBError::Query(format!("Anthropic transport error: {err}")))?;
224
225    if !(200..300).contains(&status) {
226        let message = anthropic_error_message(&body)
227            .unwrap_or_else(|| "Anthropic prompt request failed".to_string());
228        return Err(RedDBError::Query(format!(
229            "Anthropic prompt request failed (status {status}): {message}"
230        )));
231    }
232
233    parse_anthropic_prompt_response(&body, &request.model)
234}
235
236/// Async OpenAI-compatible embeddings via [`AiTransport`].
237///
238/// Uses the transport's connection pool and retry policy (429/5xx backoff)
239/// instead of the deprecated one-shot blocking path.
240pub async fn openai_embeddings_async(
241    transport: &crate::runtime::ai::transport::AiTransport,
242    request: OpenAiEmbeddingRequest,
243) -> RedDBResult<OpenAiEmbeddingResponse> {
244    if request.model.trim().is_empty() {
245        return Err(RedDBError::Query(
246            "OpenAI embedding model cannot be empty".to_string(),
247        ));
248    }
249    if request.inputs.is_empty() {
250        return Err(RedDBError::Query(
251            "at least one input is required for embeddings".to_string(),
252        ));
253    }
254
255    let url = format!("{}/embeddings", request.api_base.trim_end_matches('/'));
256    let payload =
257        build_openai_embedding_payload(&request.model, &request.inputs, request.dimensions);
258    let mut http_req =
259        crate::runtime::ai::transport::AiHttpRequest::post_json("openai-compatible", url, payload);
260    let trimmed_key = request.api_key.trim();
261    if !trimmed_key.is_empty() {
262        http_req = http_req.header("authorization", format!("Bearer {}", trimmed_key));
263    }
264
265    let response = transport
266        .request(http_req)
267        .await
268        .map_err(|e| RedDBError::Query(e.to_string()))?;
269
270    parse_openai_embedding_response(&response.body)
271}
272
273/// Async OpenAI chat-completion prompt via [`AiTransport`].
274///
275/// Uses the transport's connection pool and retry policy (429/5xx backoff)
276/// instead of the deprecated one-shot blocking path.
277pub async fn openai_prompt_async(
278    transport: &crate::runtime::ai::transport::AiTransport,
279    request: OpenAiPromptRequest,
280) -> RedDBResult<AiPromptResponse> {
281    if request.model.trim().is_empty() {
282        return Err(RedDBError::Query(
283            "OpenAI prompt model cannot be empty".to_string(),
284        ));
285    }
286    if request.prompt.trim().is_empty() {
287        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
288    }
289
290    let url = format!(
291        "{}/chat/completions",
292        request.api_base.trim_end_matches('/')
293    );
294    let payload = build_openai_prompt_payload(
295        &request.model,
296        &request.prompt,
297        request.temperature,
298        request.seed,
299        request.max_output_tokens,
300        request.stream,
301    );
302    let http_req = crate::runtime::ai::transport::AiHttpRequest::post_json("openai", url, payload)
303        .model(request.model.clone())
304        .header("authorization", format!("Bearer {}", request.api_key));
305
306    let response = transport
307        .request(http_req)
308        .await
309        .map_err(|e| RedDBError::Query(e.to_string()))?;
310
311    if request.stream {
312        parse_openai_streaming_prompt_response(&response.body, &request.model)
313    } else {
314        parse_openai_prompt_response(&response.body, &request.model)
315    }
316}
317
318/// Blocking OpenAI-compatible streaming prompt.
319///
320/// This is used by the socket-level `ASK ... STREAM` path so each provider
321/// `delta.content` can be forwarded to the HTTP client before the provider
322/// body has completed.
323pub fn openai_prompt_streaming(
324    request: OpenAiPromptRequest,
325    mut on_chunk: impl FnMut(&str) -> RedDBResult<()>,
326) -> RedDBResult<AiPromptResponse> {
327    if request.model.trim().is_empty() {
328        return Err(RedDBError::Query(
329            "OpenAI prompt model cannot be empty".to_string(),
330        ));
331    }
332    if request.prompt.trim().is_empty() {
333        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
334    }
335
336    let url = format!(
337        "{}/chat/completions",
338        request.api_base.trim_end_matches('/')
339    );
340    let payload = build_openai_prompt_payload(
341        &request.model,
342        &request.prompt,
343        request.temperature,
344        request.seed,
345        request.max_output_tokens,
346        true,
347    );
348
349    let agent: ureq::Agent = ureq::Agent::config_builder()
350        .timeout_connect(Some(Duration::from_secs(10)))
351        .timeout_send_request(Some(Duration::from_secs(30)))
352        .timeout_recv_response(Some(Duration::from_secs(120)))
353        .timeout_recv_body(Some(Duration::from_secs(120)))
354        .http_status_as_error(false)
355        .build()
356        .into();
357
358    let mut req = agent
359        .post(&url)
360        .header("content-type", "application/json")
361        .header("accept", "text/event-stream");
362    let trimmed_key = request.api_key.trim();
363    if !trimmed_key.is_empty() {
364        req = req.header("authorization", &format!("Bearer {}", trimmed_key));
365    }
366
367    let mut response = req
368        .send(payload)
369        .map_err(|err| RedDBError::Query(format!("OpenAI transport error: {err}")))?;
370    let status = response.status().as_u16();
371    if !(200..300).contains(&status) {
372        let body = response
373            .body_mut()
374            .read_to_string()
375            .unwrap_or_else(|err| format!("failed to read response body: {err}"));
376        let message = openai_error_message(&body)
377            .unwrap_or_else(|| "OpenAI prompt request failed".to_string());
378        return Err(RedDBError::Query(format!(
379            "OpenAI prompt request failed (status {status}): {message}"
380        )));
381    }
382
383    let mut model = request.model;
384    let mut chunks = Vec::new();
385    let mut prompt_tokens = None;
386    let mut completion_tokens = None;
387    let mut total_tokens = None;
388    let mut stop_reason = None;
389
390    let mut reader = std::io::BufReader::new(response.body_mut().as_reader());
391    let mut line = String::new();
392    loop {
393        line.clear();
394        let read = reader.read_line(&mut line).map_err(|err| {
395            RedDBError::Query(format!("failed to read OpenAI streaming response: {err}"))
396        })?;
397        if read == 0 {
398            break;
399        }
400
401        let trimmed = line.trim();
402        let Some(data) = trimmed.strip_prefix("data:") else {
403            continue;
404        };
405        let data = data.trim();
406        if data.is_empty() {
407            continue;
408        }
409        if data == "[DONE]" {
410            break;
411        }
412
413        let parsed = parse_json(data).map_err(|err| {
414            RedDBError::Query(format!(
415                "invalid OpenAI streaming prompt JSON response: {err}"
416            ))
417        })?;
418        let json = JsonValue::from(parsed);
419        if let Some(value) = json.get("model").and_then(JsonValue::as_str) {
420            model = value.to_string();
421        }
422        if let Some(usage) = json.get("usage") {
423            prompt_tokens = usage
424                .get("prompt_tokens")
425                .and_then(JsonValue::as_i64)
426                .and_then(|value| u64::try_from(value).ok())
427                .or(prompt_tokens);
428            completion_tokens = usage
429                .get("completion_tokens")
430                .and_then(JsonValue::as_i64)
431                .and_then(|value| u64::try_from(value).ok())
432                .or(completion_tokens);
433            total_tokens = usage
434                .get("total_tokens")
435                .and_then(JsonValue::as_i64)
436                .and_then(|value| u64::try_from(value).ok())
437                .or(total_tokens);
438        }
439
440        let Some(choices) = json.get("choices").and_then(JsonValue::as_array) else {
441            continue;
442        };
443        let Some(first_choice) = choices.first() else {
444            continue;
445        };
446        if let Some(reason) = first_choice
447            .get("finish_reason")
448            .and_then(JsonValue::as_str)
449        {
450            stop_reason = Some(reason.to_string());
451        }
452        if let Some(text) = first_choice
453            .get("delta")
454            .and_then(|delta| delta.get("content"))
455            .and_then(JsonValue::as_str)
456        {
457            if !text.is_empty() {
458                on_chunk(text)?;
459                chunks.push(text.to_string());
460            }
461        }
462    }
463
464    if chunks.is_empty() {
465        return Err(RedDBError::Query(
466            "OpenAI streaming prompt response missing text content".to_string(),
467        ));
468    }
469
470    let output_text = chunks.concat();
471    let total_tokens = total_tokens.or_else(|| match (prompt_tokens, completion_tokens) {
472        (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
473        _ => None,
474    });
475
476    Ok(AiPromptResponse {
477        provider: "openai",
478        model,
479        output_text,
480        output_chunks: Some(chunks),
481        prompt_tokens,
482        completion_tokens,
483        total_tokens,
484        stop_reason,
485    })
486}
487
488/// Async Anthropic messages-API prompt via [`AiTransport`].
489///
490/// Uses the transport's connection pool and retry policy (429/5xx backoff)
491/// instead of the deprecated one-shot blocking path.
492pub async fn anthropic_prompt_async(
493    transport: &crate::runtime::ai::transport::AiTransport,
494    request: AnthropicPromptRequest,
495) -> RedDBResult<AiPromptResponse> {
496    if request.api_key.trim().is_empty() {
497        return Err(RedDBError::Query(
498            "Anthropic API key cannot be empty".to_string(),
499        ));
500    }
501    if request.model.trim().is_empty() {
502        return Err(RedDBError::Query(
503            "Anthropic model cannot be empty".to_string(),
504        ));
505    }
506    if request.prompt.trim().is_empty() {
507        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
508    }
509
510    let url = format!("{}/messages", request.api_base.trim_end_matches('/'));
511    let payload = build_anthropic_prompt_payload(
512        &request.model,
513        &request.prompt,
514        request.temperature,
515        request.max_output_tokens,
516    );
517    let http_req =
518        crate::runtime::ai::transport::AiHttpRequest::post_json("anthropic", url, payload)
519            .model(request.model.clone())
520            .header("x-api-key", request.api_key)
521            .header("anthropic-version", request.anthropic_version);
522
523    let response = transport
524        .request(http_req)
525        .await
526        .map_err(|e| RedDBError::Query(e.to_string()))?;
527
528    parse_anthropic_prompt_response(&response.body, &request.model)
529}
530
531/// Build an OpenAI-compatible embedding request payload.
532pub(crate) fn build_embedding_payload(model: &str, inputs: &[String]) -> String {
533    build_openai_embedding_payload(model, inputs, None)
534}
535
536/// Parse an OpenAI-compatible embedding response, returning only the vectors.
537pub(crate) fn parse_embedding_vectors(body: &str) -> Result<Vec<Vec<f32>>, String> {
538    parse_openai_embedding_response(body)
539        .map(|r| r.embeddings)
540        .map_err(|e| e.to_string())
541}
542
543pub(crate) fn parse_embedding_response(body: &str) -> Result<OpenAiEmbeddingResponse, String> {
544    parse_openai_embedding_response(body).map_err(|e| e.to_string())
545}
546
547fn build_openai_embedding_payload(
548    model: &str,
549    inputs: &[String],
550    dimensions: Option<usize>,
551) -> String {
552    let mut object = Map::new();
553    object.insert("model".to_string(), JsonValue::String(model.to_string()));
554    if inputs.len() == 1 {
555        object.insert("input".to_string(), JsonValue::String(inputs[0].clone()));
556    } else {
557        object.insert(
558            "input".to_string(),
559            JsonValue::Array(inputs.iter().cloned().map(JsonValue::String).collect()),
560        );
561    }
562    if let Some(dimensions) = dimensions {
563        object.insert(
564            "dimensions".to_string(),
565            JsonValue::Number(dimensions as f64),
566        );
567    }
568    object.insert(
569        "encoding_format".to_string(),
570        JsonValue::String("float".to_string()),
571    );
572    JsonValue::Object(object).to_string_compact()
573}
574
575fn openai_error_message(body: &str) -> Option<String> {
576    provider_error_message(body)
577}
578
579fn anthropic_error_message(body: &str) -> Option<String> {
580    provider_error_message(body)
581}
582
583fn provider_error_message(body: &str) -> Option<String> {
584    let parsed = parse_json(body).ok().map(JsonValue::from)?;
585    let error = parsed.get("error")?;
586    if let Some(message) = error.get("message").and_then(JsonValue::as_str) {
587        let trimmed = message.trim();
588        if !trimmed.is_empty() {
589            return Some(trimmed.to_string());
590        }
591    }
592    None
593}
594
595fn build_openai_prompt_payload(
596    model: &str,
597    prompt: &str,
598    temperature: Option<f32>,
599    seed: Option<u64>,
600    max_output_tokens: Option<usize>,
601    stream: bool,
602) -> String {
603    let mut object = Map::new();
604    object.insert("model".to_string(), JsonValue::String(model.to_string()));
605
606    let mut message = Map::new();
607    message.insert("role".to_string(), JsonValue::String("user".to_string()));
608    message.insert("content".to_string(), JsonValue::String(prompt.to_string()));
609    object.insert(
610        "messages".to_string(),
611        JsonValue::Array(vec![JsonValue::Object(message)]),
612    );
613
614    if let Some(temperature) = temperature {
615        object.insert(
616            "temperature".to_string(),
617            JsonValue::Number(temperature as f64),
618        );
619    }
620
621    if let Some(seed) = seed {
622        object.insert("seed".to_string(), JsonValue::Number(seed as f64));
623    }
624
625    if let Some(max_output_tokens) = max_output_tokens {
626        object.insert(
627            "max_tokens".to_string(),
628            JsonValue::Number(max_output_tokens as f64),
629        );
630    }
631
632    if stream {
633        object.insert("stream".to_string(), JsonValue::Bool(true));
634        let mut options = Map::new();
635        options.insert("include_usage".to_string(), JsonValue::Bool(true));
636        object.insert("stream_options".to_string(), JsonValue::Object(options));
637    }
638
639    JsonValue::Object(object).to_string_compact()
640}
641
642fn build_anthropic_prompt_payload(
643    model: &str,
644    prompt: &str,
645    temperature: Option<f32>,
646    max_output_tokens: Option<usize>,
647) -> String {
648    let mut object = Map::new();
649    object.insert("model".to_string(), JsonValue::String(model.to_string()));
650    object.insert(
651        "max_tokens".to_string(),
652        JsonValue::Number(max_output_tokens.unwrap_or(512) as f64),
653    );
654
655    let mut message = Map::new();
656    message.insert("role".to_string(), JsonValue::String("user".to_string()));
657    message.insert("content".to_string(), JsonValue::String(prompt.to_string()));
658    object.insert(
659        "messages".to_string(),
660        JsonValue::Array(vec![JsonValue::Object(message)]),
661    );
662
663    if let Some(temperature) = temperature {
664        object.insert(
665            "temperature".to_string(),
666            JsonValue::Number(temperature as f64),
667        );
668    }
669
670    JsonValue::Object(object).to_string_compact()
671}
672
673fn extract_text_from_parts(parts: &[JsonValue]) -> Option<String> {
674    let mut chunks = Vec::new();
675    for part in parts {
676        if let Some(text) = part.as_str() {
677            let trimmed = text.trim();
678            if !trimmed.is_empty() {
679                chunks.push(trimmed.to_string());
680            }
681            continue;
682        }
683
684        let Some(object) = part.as_object() else {
685            continue;
686        };
687        let Some(text) = object.get("text").and_then(JsonValue::as_str) else {
688            continue;
689        };
690        let trimmed = text.trim();
691        if !trimmed.is_empty() {
692            chunks.push(trimmed.to_string());
693        }
694    }
695
696    if chunks.is_empty() {
697        None
698    } else {
699        Some(chunks.join("\n\n"))
700    }
701}
702
703fn parse_openai_prompt_response(
704    body: &str,
705    requested_model: &str,
706) -> RedDBResult<AiPromptResponse> {
707    let parsed = parse_json(body)
708        .map_err(|err| RedDBError::Query(format!("invalid OpenAI prompt JSON response: {err}")))?;
709    let json = JsonValue::from(parsed);
710
711    let model = json
712        .get("model")
713        .and_then(JsonValue::as_str)
714        .unwrap_or(requested_model)
715        .to_string();
716
717    let Some(choices) = json.get("choices").and_then(JsonValue::as_array) else {
718        return Err(RedDBError::Query(
719            "OpenAI prompt response missing 'choices' array".to_string(),
720        ));
721    };
722    let Some(first_choice) = choices.first() else {
723        return Err(RedDBError::Query(
724            "OpenAI prompt response contains no choices".to_string(),
725        ));
726    };
727
728    let output_text = first_choice
729        .get("message")
730        .and_then(|message| {
731            if let Some(text) = message.get("content").and_then(JsonValue::as_str) {
732                let trimmed = text.trim();
733                if !trimmed.is_empty() {
734                    return Some(trimmed.to_string());
735                }
736            }
737            message
738                .get("content")
739                .and_then(JsonValue::as_array)
740                .and_then(extract_text_from_parts)
741        })
742        .ok_or_else(|| {
743            RedDBError::Query("OpenAI prompt response missing text content".to_string())
744        })?;
745
746    let prompt_tokens = json
747        .get("usage")
748        .and_then(|usage| usage.get("prompt_tokens"))
749        .and_then(JsonValue::as_i64)
750        .and_then(|value| u64::try_from(value).ok());
751    let completion_tokens = json
752        .get("usage")
753        .and_then(|usage| usage.get("completion_tokens"))
754        .and_then(JsonValue::as_i64)
755        .and_then(|value| u64::try_from(value).ok());
756    let total_tokens = json
757        .get("usage")
758        .and_then(|usage| usage.get("total_tokens"))
759        .and_then(JsonValue::as_i64)
760        .and_then(|value| u64::try_from(value).ok())
761        .or_else(|| match (prompt_tokens, completion_tokens) {
762            (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
763            _ => None,
764        });
765
766    let stop_reason = first_choice
767        .get("finish_reason")
768        .and_then(JsonValue::as_str)
769        .map(str::to_string);
770
771    Ok(AiPromptResponse {
772        provider: "openai",
773        model,
774        output_text,
775        output_chunks: None,
776        prompt_tokens,
777        completion_tokens,
778        total_tokens,
779        stop_reason,
780    })
781}
782
783fn parse_openai_streaming_prompt_response(
784    body: &str,
785    requested_model: &str,
786) -> RedDBResult<AiPromptResponse> {
787    let mut model = requested_model.to_string();
788    let mut chunks = Vec::new();
789    let mut prompt_tokens = None;
790    let mut completion_tokens = None;
791    let mut total_tokens = None;
792    let mut stop_reason = None;
793
794    for line in body.lines() {
795        let line = line.trim();
796        let Some(data) = line.strip_prefix("data:") else {
797            continue;
798        };
799        let data = data.trim();
800        if data.is_empty() {
801            continue;
802        }
803        if data == "[DONE]" {
804            break;
805        }
806
807        let parsed = parse_json(data).map_err(|err| {
808            RedDBError::Query(format!(
809                "invalid OpenAI streaming prompt JSON response: {err}"
810            ))
811        })?;
812        let json = JsonValue::from(parsed);
813        if let Some(value) = json.get("model").and_then(JsonValue::as_str) {
814            model = value.to_string();
815        }
816        if let Some(usage) = json.get("usage") {
817            prompt_tokens = usage
818                .get("prompt_tokens")
819                .and_then(JsonValue::as_i64)
820                .and_then(|value| u64::try_from(value).ok())
821                .or(prompt_tokens);
822            completion_tokens = usage
823                .get("completion_tokens")
824                .and_then(JsonValue::as_i64)
825                .and_then(|value| u64::try_from(value).ok())
826                .or(completion_tokens);
827            total_tokens = usage
828                .get("total_tokens")
829                .and_then(JsonValue::as_i64)
830                .and_then(|value| u64::try_from(value).ok())
831                .or(total_tokens);
832        }
833
834        let Some(choices) = json.get("choices").and_then(JsonValue::as_array) else {
835            continue;
836        };
837        let Some(first_choice) = choices.first() else {
838            continue;
839        };
840        if let Some(reason) = first_choice
841            .get("finish_reason")
842            .and_then(JsonValue::as_str)
843        {
844            stop_reason = Some(reason.to_string());
845        }
846        if let Some(text) = first_choice
847            .get("delta")
848            .and_then(|delta| delta.get("content"))
849            .and_then(JsonValue::as_str)
850        {
851            if !text.is_empty() {
852                chunks.push(text.to_string());
853            }
854        }
855    }
856
857    if chunks.is_empty() {
858        return Err(RedDBError::Query(
859            "OpenAI streaming prompt response missing text content".to_string(),
860        ));
861    }
862
863    let output_text = chunks.concat();
864    let total_tokens = total_tokens.or_else(|| match (prompt_tokens, completion_tokens) {
865        (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
866        _ => None,
867    });
868
869    Ok(AiPromptResponse {
870        provider: "openai",
871        model,
872        output_text,
873        output_chunks: Some(chunks),
874        prompt_tokens,
875        completion_tokens,
876        total_tokens,
877        stop_reason,
878    })
879}
880
881fn parse_anthropic_prompt_response(
882    body: &str,
883    requested_model: &str,
884) -> RedDBResult<AiPromptResponse> {
885    let parsed = parse_json(body).map_err(|err| {
886        RedDBError::Query(format!("invalid Anthropic prompt JSON response: {err}"))
887    })?;
888    let json = JsonValue::from(parsed);
889
890    let model = json
891        .get("model")
892        .and_then(JsonValue::as_str)
893        .unwrap_or(requested_model)
894        .to_string();
895
896    let Some(content_parts) = json.get("content").and_then(JsonValue::as_array) else {
897        return Err(RedDBError::Query(
898            "Anthropic prompt response missing 'content' array".to_string(),
899        ));
900    };
901
902    let output_text = extract_text_from_parts(content_parts).ok_or_else(|| {
903        RedDBError::Query("Anthropic prompt response missing text content".to_string())
904    })?;
905
906    let prompt_tokens = json
907        .get("usage")
908        .and_then(|usage| usage.get("input_tokens"))
909        .and_then(JsonValue::as_i64)
910        .and_then(|value| u64::try_from(value).ok());
911    let completion_tokens = json
912        .get("usage")
913        .and_then(|usage| usage.get("output_tokens"))
914        .and_then(JsonValue::as_i64)
915        .and_then(|value| u64::try_from(value).ok());
916    let total_tokens = match (prompt_tokens, completion_tokens) {
917        (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
918        _ => None,
919    };
920
921    let stop_reason = json
922        .get("stop_reason")
923        .and_then(JsonValue::as_str)
924        .map(str::to_string);
925
926    Ok(AiPromptResponse {
927        provider: "anthropic",
928        model,
929        output_text,
930        output_chunks: None,
931        prompt_tokens,
932        completion_tokens,
933        total_tokens,
934        stop_reason,
935    })
936}
937
938fn parse_openai_embedding_response(body: &str) -> RedDBResult<OpenAiEmbeddingResponse> {
939    let parsed = parse_json(body).map_err(|err| {
940        RedDBError::Query(format!("invalid OpenAI embeddings JSON response: {err}"))
941    })?;
942    let json = JsonValue::from(parsed);
943
944    let model = json
945        .get("model")
946        .and_then(JsonValue::as_str)
947        .unwrap_or(DEFAULT_OPENAI_EMBEDDING_MODEL)
948        .to_string();
949
950    let Some(data) = json.get("data").and_then(JsonValue::as_array) else {
951        return Err(RedDBError::Query(
952            "OpenAI response missing 'data' array".to_string(),
953        ));
954    };
955
956    let mut rows: Vec<(usize, Vec<f32>)> = Vec::with_capacity(data.len());
957    for (position, item) in data.iter().enumerate() {
958        let index = item
959            .get("index")
960            .and_then(JsonValue::as_i64)
961            .and_then(|value| usize::try_from(value).ok())
962            .unwrap_or(position);
963
964        let Some(embedding_values) = item.get("embedding").and_then(JsonValue::as_array) else {
965            return Err(RedDBError::Query(
966                "OpenAI response contains item without 'embedding' array".to_string(),
967            ));
968        };
969        if embedding_values.is_empty() {
970            return Err(RedDBError::Query(
971                "OpenAI response contains empty embedding vector".to_string(),
972            ));
973        }
974
975        let mut embedding = Vec::with_capacity(embedding_values.len());
976        for value in embedding_values {
977            let Some(number) = value.as_f64() else {
978                return Err(RedDBError::Query(
979                    "OpenAI response contains non-numeric embedding value".to_string(),
980                ));
981            };
982            embedding.push(number as f32);
983        }
984        rows.push((index, embedding));
985    }
986    rows.sort_by_key(|(index, _)| *index);
987    let embeddings = rows.into_iter().map(|(_, embedding)| embedding).collect();
988
989    let prompt_tokens = json
990        .get("usage")
991        .and_then(|usage| usage.get("prompt_tokens"))
992        .and_then(JsonValue::as_i64)
993        .and_then(|value| u64::try_from(value).ok());
994    let total_tokens = json
995        .get("usage")
996        .and_then(|usage| usage.get("total_tokens"))
997        .and_then(JsonValue::as_i64)
998        .and_then(|value| u64::try_from(value).ok());
999
1000    Ok(OpenAiEmbeddingResponse {
1001        provider: "openai",
1002        model,
1003        embeddings,
1004        prompt_tokens,
1005        total_tokens,
1006    })
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::*;
1012
1013    #[test]
1014    fn parse_openai_embedding_response_extracts_vectors() {
1015        let body = r#"{
1016          "object":"list",
1017          "data":[
1018            {"object":"embedding","index":1,"embedding":[0.3,0.4]},
1019            {"object":"embedding","index":0,"embedding":[0.1,0.2]}
1020          ],
1021          "model":"text-embedding-3-small",
1022          "usage":{"prompt_tokens":12,"total_tokens":12}
1023        }"#;
1024
1025        let result = parse_openai_embedding_response(body).expect("response should parse");
1026        assert_eq!(result.provider, "openai");
1027        assert_eq!(result.model, "text-embedding-3-small");
1028        assert_eq!(result.embeddings.len(), 2);
1029        assert_eq!(result.embeddings[0], vec![0.1, 0.2]);
1030        assert_eq!(result.embeddings[1], vec![0.3, 0.4]);
1031        assert_eq!(result.prompt_tokens, Some(12));
1032        assert_eq!(result.total_tokens, Some(12));
1033    }
1034
1035    #[test]
1036    fn openai_error_message_extracts_nested_message() {
1037        let body = r#"{"error":{"message":"bad api key","type":"invalid_request_error"}}"#;
1038        assert_eq!(openai_error_message(body).as_deref(), Some("bad api key"));
1039    }
1040
1041    #[test]
1042    fn parse_openai_prompt_response_extracts_text_and_usage() {
1043        let body = r#"{
1044          "id":"chatcmpl_1",
1045          "object":"chat.completion",
1046          "model":"gpt-4.1-mini",
1047          "choices":[
1048            {
1049              "index":0,
1050              "finish_reason":"stop",
1051              "message":{"role":"assistant","content":"Resumo pronto."}
1052            }
1053          ],
1054          "usage":{"prompt_tokens":10,"completion_tokens":4,"total_tokens":14}
1055        }"#;
1056
1057        let parsed =
1058            parse_openai_prompt_response(body, DEFAULT_OPENAI_PROMPT_MODEL).expect("parse");
1059        assert_eq!(parsed.provider, "openai");
1060        assert_eq!(parsed.model, "gpt-4.1-mini");
1061        assert_eq!(parsed.output_text, "Resumo pronto.");
1062        assert_eq!(parsed.prompt_tokens, Some(10));
1063        assert_eq!(parsed.completion_tokens, Some(4));
1064        assert_eq!(parsed.total_tokens, Some(14));
1065        assert_eq!(parsed.stop_reason.as_deref(), Some("stop"));
1066    }
1067
1068    #[test]
1069    fn parse_anthropic_prompt_response_extracts_text_and_usage() {
1070        let body = r#"{
1071          "id":"msg_1",
1072          "model":"claude-3-5-haiku-latest",
1073          "type":"message",
1074          "content":[{"type":"text","text":"Action complete."}],
1075          "usage":{"input_tokens":11,"output_tokens":5},
1076          "stop_reason":"end_turn"
1077        }"#;
1078
1079        let parsed =
1080            parse_anthropic_prompt_response(body, DEFAULT_ANTHROPIC_PROMPT_MODEL).expect("parse");
1081        assert_eq!(parsed.provider, "anthropic");
1082        assert_eq!(parsed.model, "claude-3-5-haiku-latest");
1083        assert_eq!(parsed.output_text, "Action complete.");
1084        assert_eq!(parsed.prompt_tokens, Some(11));
1085        assert_eq!(parsed.completion_tokens, Some(5));
1086        assert_eq!(parsed.total_tokens, Some(16));
1087        assert_eq!(parsed.stop_reason.as_deref(), Some("end_turn"));
1088    }
1089
1090    #[test]
1091    fn resolve_api_key_prefers_vault_secret_over_legacy_config() {
1092        let provider = AiProvider::OpenAi;
1093        let alias = "vault_unit_alias";
1094        let secret_path = ai_api_secret_path(&provider, alias);
1095        let legacy_key = ai_api_legacy_config_key(&provider, alias);
1096
1097        let resolved = resolve_api_key(&provider, Some(alias), |key| {
1098            if key == secret_path {
1099                Ok(Some("vault-key".to_string()))
1100            } else if key == legacy_key {
1101                Ok(Some("legacy-key".to_string()))
1102            } else {
1103                Ok(None)
1104            }
1105        })
1106        .expect("resolve");
1107
1108        assert_eq!(resolved, "vault-key");
1109    }
1110
1111    #[test]
1112    fn resolve_api_key_uses_default_vault_secret_path() {
1113        let provider = AiProvider::OpenAi;
1114        let secret_path = ai_api_secret_path(&provider, "default");
1115
1116        let resolved = resolve_api_key(&provider, None, |key| {
1117            if key == secret_path {
1118                Ok(Some("default-vault-key".to_string()))
1119            } else {
1120                Ok(None)
1121            }
1122        })
1123        .expect("resolve");
1124
1125        assert_eq!(resolved, "default-vault-key");
1126    }
1127
1128    #[test]
1129    fn openai_prompt_payload_includes_temperature_and_seed_when_present() {
1130        let payload = build_openai_prompt_payload(
1131            "gpt-4.1-mini",
1132            "hello",
1133            Some(0.0),
1134            Some(42),
1135            Some(128),
1136            false,
1137        );
1138        let parsed = JsonValue::from(parse_json(&payload).expect("valid json"));
1139
1140        assert_eq!(
1141            parsed.get("temperature").and_then(JsonValue::as_f64),
1142            Some(0.0)
1143        );
1144        assert_eq!(parsed.get("seed").and_then(JsonValue::as_u64), Some(42));
1145        assert_eq!(
1146            parsed.get("max_tokens").and_then(JsonValue::as_u64),
1147            Some(128)
1148        );
1149    }
1150
1151    #[test]
1152    fn openai_prompt_payload_omits_seed_when_none() {
1153        let payload =
1154            build_openai_prompt_payload("gpt-4.1-mini", "hello", Some(0.0), None, None, false);
1155        let parsed = JsonValue::from(parse_json(&payload).expect("valid json"));
1156
1157        assert!(parsed.get("seed").is_none());
1158        assert!(parsed.get("stream").is_none());
1159        assert_eq!(
1160            parsed.get("temperature").and_then(JsonValue::as_f64),
1161            Some(0.0)
1162        );
1163    }
1164
1165    #[test]
1166    fn openai_prompt_payload_enables_stream_options() {
1167        let payload =
1168            build_openai_prompt_payload("gpt-4.1-mini", "hello", Some(0.0), None, None, true);
1169        let parsed = JsonValue::from(parse_json(&payload).expect("valid json"));
1170
1171        assert_eq!(
1172            parsed.get("stream").and_then(JsonValue::as_bool),
1173            Some(true)
1174        );
1175        assert_eq!(
1176            parsed
1177                .get("stream_options")
1178                .and_then(|value| value.get("include_usage"))
1179                .and_then(JsonValue::as_bool),
1180            Some(true)
1181        );
1182    }
1183
1184    #[test]
1185    fn openai_streaming_prompt_response_collects_delta_chunks() {
1186        let body = concat!(
1187            "data: {\"model\":\"gpt-test\",\"choices\":[{\"delta\":{\"content\":\"login \"},\"finish_reason\":null}]}\n\n",
1188            "data: {\"model\":\"gpt-test\",\"choices\":[{\"delta\":{\"content\":\"failed\"},\"finish_reason\":null}]}\n\n",
1189            "data: {\"model\":\"gpt-test\",\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":12,\"completion_tokens\":2,\"total_tokens\":14}}\n\n",
1190            "data: [DONE]\n\n",
1191        );
1192        let parsed = parse_openai_streaming_prompt_response(body, "fallback").unwrap();
1193
1194        assert_eq!(parsed.model, "gpt-test");
1195        assert_eq!(parsed.output_text, "login failed");
1196        assert_eq!(
1197            parsed.output_chunks.as_deref(),
1198            Some(["login ".to_string(), "failed".to_string()].as_slice())
1199        );
1200        assert_eq!(parsed.prompt_tokens, Some(12));
1201        assert_eq!(parsed.completion_tokens, Some(2));
1202        assert_eq!(parsed.total_tokens, Some(14));
1203        assert_eq!(parsed.stop_reason.as_deref(), Some("stop"));
1204    }
1205
1206    #[tokio::test]
1207    async fn openai_prompt_async_rejects_empty_model() {
1208        let transport = crate::runtime::ai::transport::AiTransport::new(Default::default());
1209        let request = OpenAiPromptRequest {
1210            api_key: "key".to_string(),
1211            model: "  ".to_string(),
1212            prompt: "hello".to_string(),
1213            temperature: None,
1214            seed: None,
1215            max_output_tokens: None,
1216            api_base: "https://api.openai.com/v1".to_string(),
1217            stream: false,
1218        };
1219        let err = openai_prompt_async(&transport, request).await.unwrap_err();
1220        assert!(err.to_string().contains("model cannot be empty"));
1221    }
1222
1223    #[tokio::test]
1224    async fn openai_prompt_async_rejects_empty_prompt() {
1225        let transport = crate::runtime::ai::transport::AiTransport::new(Default::default());
1226        let request = OpenAiPromptRequest {
1227            api_key: "key".to_string(),
1228            model: "gpt-4.1-mini".to_string(),
1229            prompt: "".to_string(),
1230            temperature: None,
1231            seed: None,
1232            max_output_tokens: None,
1233            api_base: "https://api.openai.com/v1".to_string(),
1234            stream: false,
1235        };
1236        let err = openai_prompt_async(&transport, request).await.unwrap_err();
1237        assert!(err.to_string().contains("prompt cannot be empty"));
1238    }
1239
1240    // ========================================================================
1241    // openai-compat client tests (issue gh-516)
1242    //
1243    // Each test spins up a tiny TCP server, hands its base URL to the
1244    // new generic client, and asserts on the captured request +
1245    // synthesised response. Tests run in parallel-safe fashion (each
1246    // server binds to port 0).
1247    // ========================================================================
1248
1249    use std::io::{Read as _, Write as _};
1250    use std::net::TcpListener;
1251    use std::sync::{Arc, Mutex};
1252    use std::thread;
1253
1254    struct CapturedRequest {
1255        method: String,
1256        path: String,
1257        headers: Vec<(String, String)>,
1258        body: String,
1259    }
1260
1261    fn parse_http_request(stream: &mut std::net::TcpStream) -> CapturedRequest {
1262        let mut buf = [0u8; 8192];
1263        let mut data = Vec::new();
1264        loop {
1265            let read = stream.read(&mut buf).unwrap_or(0);
1266            if read == 0 {
1267                break;
1268            }
1269            data.extend_from_slice(&buf[..read]);
1270            if let Some(idx) = data.windows(4).position(|w| w == b"\r\n\r\n") {
1271                let header_len = idx + 4;
1272                let header_str = String::from_utf8_lossy(&data[..idx]).to_string();
1273                let mut lines = header_str.split("\r\n");
1274                let request_line = lines.next().unwrap_or("");
1275                let mut parts = request_line.split_whitespace();
1276                let method = parts.next().unwrap_or("").to_string();
1277                let path = parts.next().unwrap_or("").to_string();
1278                let mut headers = Vec::new();
1279                let mut content_length: usize = 0;
1280                for line in lines {
1281                    if let Some((k, v)) = line.split_once(':') {
1282                        let k = k.trim().to_string();
1283                        let v = v.trim().to_string();
1284                        if k.eq_ignore_ascii_case("content-length") {
1285                            content_length = v.parse().unwrap_or(0);
1286                        }
1287                        headers.push((k, v));
1288                    }
1289                }
1290                while data.len() < header_len + content_length {
1291                    let read = stream.read(&mut buf).unwrap_or(0);
1292                    if read == 0 {
1293                        break;
1294                    }
1295                    data.extend_from_slice(&buf[..read]);
1296                }
1297                let body =
1298                    String::from_utf8_lossy(&data[header_len..header_len + content_length])
1299                        .to_string();
1300                return CapturedRequest {
1301                    method,
1302                    path,
1303                    headers,
1304                    body,
1305                };
1306            }
1307        }
1308        CapturedRequest {
1309            method: String::new(),
1310            path: String::new(),
1311            headers: Vec::new(),
1312            body: String::new(),
1313        }
1314    }
1315
1316    /// Spawn a one-shot HTTP server that replies with `(status, body)`
1317    /// to a single request, captures it, and returns `(base_url, captured)`.
1318    fn spawn_mock(
1319        status: u16,
1320        response_body: &'static str,
1321    ) -> (String, Arc<Mutex<Option<CapturedRequest>>>) {
1322        let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
1323        let addr = listener.local_addr().expect("addr");
1324        let captured: Arc<Mutex<Option<CapturedRequest>>> = Arc::new(Mutex::new(None));
1325        let captured_clone = Arc::clone(&captured);
1326        thread::spawn(move || {
1327            if let Ok((mut stream, _)) = listener.accept() {
1328                let req = parse_http_request(&mut stream);
1329                *captured_clone.lock().unwrap() = Some(req);
1330                let status_line = match status {
1331                    200 => "200 OK",
1332                    400 => "400 Bad Request",
1333                    401 => "401 Unauthorized",
1334                    500 => "500 Internal Server Error",
1335                    _ => "200 OK",
1336                };
1337                let resp = format!(
1338                    "HTTP/1.1 {status_line}\r\n\
1339                     Content-Type: application/json\r\n\
1340                     Content-Length: {}\r\n\
1341                     Connection: close\r\n\r\n{}",
1342                    response_body.len(),
1343                    response_body
1344                );
1345                let _ = stream.write_all(resp.as_bytes());
1346            }
1347        });
1348        (format!("http://{}", addr), captured)
1349    }
1350
1351    #[test]
1352    fn openai_compat_chat_roundtrip_honors_arbitrary_api_base_and_headers() {
1353        let body = r#"{
1354            "id":"chatcmpl_x",
1355            "model":"custom-model",
1356            "choices":[{"index":0,"finish_reason":"stop","message":{"role":"assistant","content":"hi"}}],
1357            "usage":{"prompt_tokens":7,"completion_tokens":2,"total_tokens":9}
1358        }"#;
1359        let (base, captured) = spawn_mock(200, body);
1360
1361        let req = OpenAiCompatChatRequest {
1362            api_base: base.clone(),
1363            api_key: "sk-test".to_string(),
1364            model: "custom-model".to_string(),
1365            prompt: "say hi".to_string(),
1366            temperature: None,
1367            seed: None,
1368            max_output_tokens: None,
1369            extra_headers: vec![("X-Custom-Tag".to_string(), "abc".to_string())],
1370        };
1371        let resp = openai_compat_chat(req).expect("ok");
1372
1373        assert_eq!(resp.output_text, "hi");
1374        assert_eq!(resp.model, "custom-model");
1375        assert_eq!(resp.usage.input_tokens, Some(7));
1376        assert_eq!(resp.usage.output_tokens, Some(2));
1377        assert_eq!(resp.usage.total_tokens, Some(9));
1378        assert_eq!(resp.stop_reason.as_deref(), Some("stop"));
1379
1380        let cap = captured.lock().unwrap().take().expect("captured");
1381        assert_eq!(cap.method, "POST");
1382        assert_eq!(cap.path, "/chat/completions");
1383        let has_auth = cap
1384            .headers
1385            .iter()
1386            .any(|(k, v)| k.eq_ignore_ascii_case("authorization") && v == "Bearer sk-test");
1387        assert!(has_auth, "Authorization header missing");
1388        let has_custom = cap
1389            .headers
1390            .iter()
1391            .any(|(k, v)| k.eq_ignore_ascii_case("x-custom-tag") && v == "abc");
1392        assert!(has_custom, "extra header missing");
1393        assert!(cap.body.contains("\"model\":\"custom-model\""));
1394    }
1395
1396    #[test]
1397    fn openai_compat_embeddings_roundtrip_with_dimensions() {
1398        let body = r#"{
1399            "object":"list",
1400            "model":"embed-model",
1401            "data":[{"object":"embedding","index":0,"embedding":[0.5,0.25]}],
1402            "usage":{"prompt_tokens":4,"total_tokens":4}
1403        }"#;
1404        let (base, captured) = spawn_mock(200, body);
1405
1406        let req = OpenAiCompatEmbeddingsRequest {
1407            api_base: base,
1408            api_key: "sk-emb".to_string(),
1409            model: "embed-model".to_string(),
1410            inputs: vec!["hello".to_string()],
1411            dimensions: Some(2),
1412            extra_headers: vec![],
1413        };
1414        let resp = openai_compat_embeddings(req).expect("ok");
1415
1416        assert_eq!(resp.embeddings.len(), 1);
1417        assert_eq!(resp.embeddings[0], vec![0.5_f32, 0.25_f32]);
1418        assert_eq!(resp.usage.total_tokens, Some(4));
1419        assert_eq!(resp.usage.input_tokens, Some(4));
1420
1421        let cap = captured.lock().unwrap().take().expect("captured");
1422        assert_eq!(cap.path, "/embeddings");
1423        assert!(cap.body.contains("\"dimensions\":2"));
1424    }
1425
1426    #[test]
1427    fn openai_compat_chat_non_2xx_returns_structured_error() {
1428        let body = r#"{"error":{"message":"bad api key","type":"invalid_request_error"}}"#;
1429        let (base, _captured) = spawn_mock(401, body);
1430
1431        let req = OpenAiCompatChatRequest {
1432            api_base: base,
1433            api_key: "bad".to_string(),
1434            model: "m".to_string(),
1435            prompt: "hi".to_string(),
1436            temperature: None,
1437            seed: None,
1438            max_output_tokens: None,
1439            extra_headers: vec![],
1440        };
1441        let err = openai_compat_chat(req).unwrap_err().to_string();
1442        assert!(err.contains("status 401"), "got: {err}");
1443        assert!(err.contains("bad api key"), "got: {err}");
1444    }
1445
1446    #[test]
1447    fn openai_compat_chat_rejects_empty_model_and_prompt() {
1448        let req = OpenAiCompatChatRequest {
1449            api_base: "http://localhost:1".to_string(),
1450            api_key: "k".to_string(),
1451            model: "  ".to_string(),
1452            prompt: "hi".to_string(),
1453            temperature: None,
1454            seed: None,
1455            max_output_tokens: None,
1456            extra_headers: vec![],
1457        };
1458        let err = openai_compat_chat(req).unwrap_err().to_string();
1459        assert!(err.contains("model cannot be empty"), "got: {err}");
1460
1461        let req = OpenAiCompatChatRequest {
1462            api_base: "http://localhost:1".to_string(),
1463            api_key: "k".to_string(),
1464            model: "m".to_string(),
1465            prompt: "  ".to_string(),
1466            temperature: None,
1467            seed: None,
1468            max_output_tokens: None,
1469            extra_headers: vec![],
1470        };
1471        let err = openai_compat_chat(req).unwrap_err().to_string();
1472        assert!(err.contains("prompt cannot be empty"), "got: {err}");
1473    }
1474
1475    #[test]
1476    fn parse_provider_mode_recognizes_all_three_tokens() {
1477        assert_eq!(
1478            parse_provider_mode("openai-compat"),
1479            Some(AiProviderMode::OpenAiCompat)
1480        );
1481        assert_eq!(
1482            parse_provider_mode("OPENAI_NATIVE"),
1483            Some(AiProviderMode::OpenAiNative)
1484        );
1485        assert_eq!(
1486            parse_provider_mode("anthropic-native"),
1487            Some(AiProviderMode::AnthropicNative)
1488        );
1489        assert_eq!(parse_provider_mode("groq"), None);
1490    }
1491
1492    #[test]
1493    fn resolve_provider_mode_reads_kv_key() {
1494        let kv = |key: &str| -> crate::RedDBResult<Option<String>> {
1495            if key == "red.config.ai.provider" {
1496                Ok(Some("anthropic-native".to_string()))
1497            } else {
1498                Ok(None)
1499            }
1500        };
1501        assert_eq!(
1502            resolve_provider_mode(&kv),
1503            Some(AiProviderMode::AnthropicNative)
1504        );
1505    }
1506
1507    #[test]
1508    fn resolve_default_provider_honors_mode_key() {
1509        let kv = |key: &str| -> crate::RedDBResult<Option<String>> {
1510            match key {
1511                "red.config.ai.provider" => Ok(Some("anthropic-native".to_string())),
1512                "red.config.ai.default.provider" => Ok(Some("groq".to_string())),
1513                _ => Ok(None),
1514            }
1515        };
1516        assert_eq!(resolve_default_provider(&kv), AiProvider::Anthropic);
1517    }
1518
1519    #[tokio::test]
1520    async fn anthropic_prompt_async_rejects_empty_api_key() {
1521        let transport = crate::runtime::ai::transport::AiTransport::new(Default::default());
1522        let request = AnthropicPromptRequest {
1523            api_key: "  ".to_string(),
1524            model: "claude-3-5-haiku-latest".to_string(),
1525            prompt: "hello".to_string(),
1526            temperature: None,
1527            max_output_tokens: None,
1528            api_base: "https://api.anthropic.com/v1".to_string(),
1529            anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(),
1530        };
1531        let err = anthropic_prompt_async(&transport, request)
1532            .await
1533            .unwrap_err();
1534        assert!(err.to_string().contains("API key cannot be empty"));
1535    }
1536}
1537
1538// ============================================================================
1539// Provider & Credential Resolution (shared between HTTP, gRPC, and runtime)
1540// ============================================================================
1541
1542/// AI provider identifier.
1543#[derive(Debug, Clone, PartialEq, Eq)]
1544pub enum AiProvider {
1545    OpenAi,
1546    Anthropic,
1547    Groq,
1548    OpenRouter,
1549    Together,
1550    Venice,
1551    Ollama,
1552    DeepSeek,
1553    HuggingFace,
1554    Local,
1555    Custom(String),
1556}
1557
1558impl AiProvider {
1559    pub fn token(&self) -> &str {
1560        match self {
1561            Self::OpenAi => "openai",
1562            Self::Anthropic => "anthropic",
1563            Self::Groq => "groq",
1564            Self::OpenRouter => "openrouter",
1565            Self::Together => "together",
1566            Self::Venice => "venice",
1567            Self::Ollama => "ollama",
1568            Self::DeepSeek => "deepseek",
1569            Self::HuggingFace => "huggingface",
1570            Self::Local => "local",
1571            Self::Custom(name) => name.as_str(),
1572        }
1573    }
1574
1575    pub fn default_prompt_model(&self) -> &str {
1576        match self {
1577            Self::OpenAi => DEFAULT_OPENAI_PROMPT_MODEL,
1578            Self::Anthropic => DEFAULT_ANTHROPIC_PROMPT_MODEL,
1579            Self::Groq => "llama-3.3-70b-versatile",
1580            Self::OpenRouter => "auto",
1581            Self::Together => "meta-llama/Meta-Llama-3-8B-Instruct",
1582            Self::Venice => "llama-3.3-70b",
1583            Self::Ollama => "llama3",
1584            Self::DeepSeek => "deepseek-chat",
1585            Self::HuggingFace => "mistralai/Mistral-7B-Instruct-v0.3",
1586            Self::Local => "sentence-transformers/all-MiniLM-L6-v2",
1587            Self::Custom(_) => DEFAULT_OPENAI_PROMPT_MODEL,
1588        }
1589    }
1590
1591    pub fn prompt_model_env_name(&self) -> String {
1592        format!("REDDB_{}_PROMPT_MODEL", self.token().to_ascii_uppercase())
1593    }
1594
1595    pub fn default_embedding_model(&self) -> &str {
1596        match self {
1597            Self::Ollama => "nomic-embed-text",
1598            Self::HuggingFace | Self::Local => "sentence-transformers/all-MiniLM-L6-v2",
1599            _ => DEFAULT_OPENAI_EMBEDDING_MODEL,
1600        }
1601    }
1602
1603    pub fn default_api_base(&self) -> &str {
1604        match self {
1605            Self::OpenAi => DEFAULT_OPENAI_API_BASE,
1606            Self::Anthropic => DEFAULT_ANTHROPIC_API_BASE,
1607            Self::Groq => "https://api.groq.com/openai/v1",
1608            Self::OpenRouter => "https://openrouter.ai/api/v1",
1609            Self::Together => "https://api.together.xyz/v1",
1610            Self::Venice => "https://api.venice.ai/api/v1",
1611            Self::Ollama => "http://localhost:11434/v1",
1612            Self::DeepSeek => "https://api.deepseek.com/v1",
1613            Self::HuggingFace => "https://api-inference.huggingface.co",
1614            Self::Local => "local",
1615            Self::Custom(base) => base.as_str(),
1616        }
1617    }
1618
1619    pub fn api_base_env_name(&self) -> String {
1620        format!("REDDB_{}_API_BASE", self.token().to_ascii_uppercase())
1621    }
1622
1623    pub fn default_key_env_name(&self) -> String {
1624        format!("REDDB_{}_API_KEY", self.token().to_ascii_uppercase())
1625    }
1626
1627    pub fn alias_key_env_name(&self, alias: &str) -> String {
1628        let normalized = normalize_alias_token(alias);
1629        format!(
1630            "REDDB_{}_API_KEY_{normalized}",
1631            self.token().to_ascii_uppercase()
1632        )
1633    }
1634
1635    pub fn resolve_api_base(&self) -> String {
1636        if let Ok(value) = std::env::var(self.api_base_env_name()) {
1637            let value = value.trim().to_string();
1638            if !value.is_empty() {
1639                return value;
1640            }
1641        }
1642        self.default_api_base().to_string()
1643    }
1644
1645    /// Resolve API base URL checking KV store too (for custom base_url config).
1646    pub fn resolve_api_base_with_kv<F>(&self, alias: &str, kv_getter: &F) -> String
1647    where
1648        F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1649    {
1650        // 1. Env var
1651        if let Ok(value) = std::env::var(self.api_base_env_name()) {
1652            let value = value.trim().to_string();
1653            if !value.is_empty() {
1654                return value;
1655            }
1656        }
1657        // 2. KV store: {provider}/{alias}/base_url
1658        let kv_key = format!("red.config.ai.{}.{alias}.base_url", self.token());
1659        if let Ok(Some(value)) = kv_getter(&kv_key) {
1660            let value = value.trim().to_string();
1661            if !value.is_empty() {
1662                return value;
1663            }
1664        }
1665        self.default_api_base().to_string()
1666    }
1667
1668    /// Whether this provider uses the OpenAI-compatible API format.
1669    pub fn is_openai_compatible(&self) -> bool {
1670        matches!(
1671            self,
1672            Self::OpenAi
1673                | Self::Groq
1674                | Self::OpenRouter
1675                | Self::Together
1676                | Self::Venice
1677                | Self::Ollama
1678                | Self::DeepSeek
1679                | Self::Custom(_)
1680        )
1681    }
1682
1683    /// Whether this provider requires an API key (Ollama/Local don't).
1684    pub fn requires_api_key(&self) -> bool {
1685        !matches!(self, Self::Ollama | Self::Local)
1686    }
1687}
1688
1689/// Parse a provider string into AiProvider.
1690pub fn parse_provider(name: &str) -> crate::RedDBResult<AiProvider> {
1691    match name.trim().to_ascii_lowercase().as_str() {
1692        "openai" => Ok(AiProvider::OpenAi),
1693        "anthropic" => Ok(AiProvider::Anthropic),
1694        "groq" => Ok(AiProvider::Groq),
1695        "openrouter" | "open_router" => Ok(AiProvider::OpenRouter),
1696        "together" => Ok(AiProvider::Together),
1697        "venice" => Ok(AiProvider::Venice),
1698        "ollama" => Ok(AiProvider::Ollama),
1699        "deepseek" | "deep_seek" => Ok(AiProvider::DeepSeek),
1700        "huggingface" | "hf" => Ok(AiProvider::HuggingFace),
1701        "local" => Ok(AiProvider::Local),
1702        other => {
1703            // Treat as custom provider if it looks like a URL
1704            if other.starts_with("http://") || other.starts_with("https://") {
1705                Ok(AiProvider::Custom(other.to_string()))
1706            } else {
1707                Err(crate::RedDBError::Query(format!(
1708                    "unsupported AI provider '{other}'; expected: openai, anthropic, groq, \
1709                     openrouter, together, venice, ollama, deepseek, huggingface, local"
1710                )))
1711            }
1712        }
1713    }
1714}
1715
1716/// Resolve the default AI provider. Checks:
1717/// 1. `REDDB_AI_PROVIDER` env var
1718/// 2. `red_config` KV key `red.config.ai.default.provider`
1719/// 3. Falls back to OpenAI
1720pub fn resolve_default_provider<F>(kv_getter: &F) -> AiProvider
1721where
1722    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1723{
1724    // 0. New mode selector (red.config.ai.provider) takes precedence
1725    //    when explicitly set — it picks the wire-protocol family.
1726    if let Some(mode) = resolve_provider_mode(kv_getter) {
1727        return provider_mode_to_provider(mode);
1728    }
1729    // 1. Env var
1730    if let Ok(value) = std::env::var("REDDB_AI_PROVIDER") {
1731        let value = value.trim().to_string();
1732        if !value.is_empty() {
1733            if let Ok(provider) = parse_provider(&value) {
1734                return provider;
1735            }
1736        }
1737    }
1738    // 2. KV store
1739    if let Ok(Some(value)) = kv_getter("red.config.ai.default.provider") {
1740        let value = value.trim().to_string();
1741        if !value.is_empty() {
1742            if let Ok(provider) = parse_provider(&value) {
1743                return provider;
1744            }
1745        }
1746    }
1747    AiProvider::OpenAi
1748}
1749
1750/// Resolve the default AI model. Checks:
1751/// 1. `REDDB_AI_MODEL` env var
1752/// 2. `red_config` KV key `red.config.ai.default.model`
1753/// 3. Falls back to provider's default
1754pub fn resolve_default_model<F>(provider: &AiProvider, kv_getter: &F) -> String
1755where
1756    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1757{
1758    // 1. Env var
1759    if let Ok(value) = std::env::var("REDDB_AI_MODEL") {
1760        let value = value.trim().to_string();
1761        if !value.is_empty() {
1762            return value;
1763        }
1764    }
1765    // 2. Provider-specific env var
1766    if let Ok(value) = std::env::var(provider.prompt_model_env_name()) {
1767        let value = value.trim().to_string();
1768        if !value.is_empty() {
1769            return value;
1770        }
1771    }
1772    // 3. KV store
1773    if let Ok(Some(value)) = kv_getter("red.config.ai.default.model") {
1774        let value = value.trim().to_string();
1775        if !value.is_empty() {
1776            return value;
1777        }
1778    }
1779    provider.default_prompt_model().to_string()
1780}
1781
1782/// Resolve default provider + model from runtime KV store.
1783pub fn resolve_defaults_from_runtime(
1784    runtime: &crate::runtime::RedDBRuntime,
1785) -> (AiProvider, String) {
1786    use crate::application::ports::RuntimeEntityPort;
1787    let kv_getter = |key: &str| -> crate::RedDBResult<Option<String>> {
1788        match runtime.get_kv("red_config", key)? {
1789            Some((crate::storage::schema::Value::Text(s), _)) => Ok(Some(s.to_string())),
1790            _ => Ok(None),
1791        }
1792    };
1793    let provider = resolve_default_provider(&kv_getter);
1794    let model = resolve_default_model(&provider, &kv_getter);
1795    (provider, model)
1796}
1797
1798/// Resolve default provider + model via RuntimeEntityPort trait (for use in QueryUseCases).
1799pub fn resolve_defaults_from_runtime_port<
1800    P: crate::application::ports::RuntimeEntityPort + ?Sized,
1801>(
1802    runtime: &P,
1803) -> (AiProvider, String) {
1804    let kv_getter = |key: &str| -> crate::RedDBResult<Option<String>> {
1805        match runtime.get_kv("red_config", key)? {
1806            Some((crate::storage::schema::Value::Text(s), _)) => Ok(Some(s.to_string())),
1807            _ => Ok(None),
1808        }
1809    };
1810    let provider = resolve_default_provider(&kv_getter);
1811    let model = resolve_default_model(&provider, &kv_getter);
1812    (provider, model)
1813}
1814
1815/// Resolve an API key for a provider. Uses the chain:
1816/// 1. Environment variable with alias: `REDDB_OPENAI_API_KEY_{ALIAS}`
1817/// 2. Vault secret lookup via `kv_getter` closure
1818/// 3. Legacy KV store lookup via `kv_getter` closure
1819/// 4. Default environment variable: `REDDB_OPENAI_API_KEY`
1820///
1821/// `kv_getter` receives either a `red.secret.*` path or a legacy `red.config.*`
1822/// key and returns the value if found.
1823pub fn resolve_api_key<F>(
1824    provider: &AiProvider,
1825    credential_alias: Option<&str>,
1826    kv_getter: F,
1827) -> crate::RedDBResult<String>
1828where
1829    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1830{
1831    // Providers that don't require API keys
1832    if !provider.requires_api_key() {
1833        // Still try to find a key (user may have one for auth'd Ollama)
1834        if let Ok(value) = std::env::var(provider.default_key_env_name()) {
1835            let value = value.trim().to_string();
1836            if !value.is_empty() {
1837                return Ok(value);
1838            }
1839        }
1840        return Ok(String::new());
1841    }
1842
1843    if let Some(alias) = credential_alias.map(str::trim).filter(|a| !a.is_empty()) {
1844        // Try env var with alias
1845        if let Some(key) = resolve_key_from_env_alias(provider, alias) {
1846            return Ok(key);
1847        }
1848        if let Some(key) = kv_getter(&ai_api_secret_path(provider, alias))? {
1849            if !key.trim().is_empty() {
1850                return Ok(key);
1851            }
1852        }
1853        if let Some(secret_ref) = kv_getter(&ai_api_secret_ref_config_key(provider, alias))? {
1854            if let Some(key) = kv_getter(secret_ref.trim())? {
1855                if !key.trim().is_empty() {
1856                    return Ok(key);
1857                }
1858            }
1859        }
1860        let legacy_key = ai_api_legacy_config_key(provider, alias);
1861        if let Some(key) = kv_getter(&legacy_key)? {
1862            if !key.trim().is_empty() {
1863                return Ok(key);
1864            }
1865        }
1866        return Err(crate::RedDBError::Query(format!(
1867            "credential '{alias}' not found for {}. Set env {} or store it in the vault",
1868            provider.token(),
1869            provider.alias_key_env_name(alias)
1870        )));
1871    }
1872
1873    // Default env var
1874    if let Ok(value) = std::env::var(provider.default_key_env_name()) {
1875        let value = value.trim().to_string();
1876        if !value.is_empty() {
1877            return Ok(value);
1878        }
1879    }
1880
1881    if let Some(key) = kv_getter(&ai_api_secret_path(provider, "default"))? {
1882        if !key.trim().is_empty() {
1883            return Ok(key);
1884        }
1885    }
1886    if let Some(secret_ref) = kv_getter(&ai_api_secret_ref_config_key(provider, "default"))? {
1887        if let Some(key) = kv_getter(secret_ref.trim())? {
1888            if !key.trim().is_empty() {
1889                return Ok(key);
1890            }
1891        }
1892    }
1893    if let Some(key) = kv_getter(&ai_api_legacy_config_key(provider, "default"))? {
1894        if !key.trim().is_empty() {
1895            return Ok(key);
1896        }
1897    }
1898
1899    let legacy_short_key = format!("{}/default", provider.token());
1900    if let Some(key) = kv_getter(&legacy_short_key)? {
1901        if !key.trim().is_empty() {
1902            return Ok(key);
1903        }
1904    }
1905
1906    Err(crate::RedDBError::Query(format!(
1907        "missing {} API key. Set {} or provide credential alias",
1908        provider.token(),
1909        provider.default_key_env_name()
1910    )))
1911}
1912
1913pub fn ai_api_secret_path(provider: &AiProvider, alias: &str) -> String {
1914    format!(
1915        "red.secret.ai.{}.{}.api_key",
1916        provider.token(),
1917        normalize_credential_alias_path(alias)
1918    )
1919}
1920
1921pub fn ai_api_secret_ref_config_key(provider: &AiProvider, alias: &str) -> String {
1922    format!(
1923        "red.config.ai.{}.{}.secret_ref",
1924        provider.token(),
1925        normalize_credential_alias_path(alias)
1926    )
1927}
1928
1929pub fn ai_api_legacy_config_key(provider: &AiProvider, alias: &str) -> String {
1930    format!(
1931        "red.config.ai.{}.{}.key",
1932        provider.token(),
1933        normalize_credential_alias_path(alias)
1934    )
1935}
1936
1937fn normalize_credential_alias_path(alias: &str) -> String {
1938    let alias = alias.trim();
1939    if alias.is_empty() {
1940        "default".to_string()
1941    } else {
1942        alias.to_ascii_lowercase()
1943    }
1944}
1945
1946fn resolve_key_from_env_alias(provider: &AiProvider, alias: &str) -> Option<String> {
1947    let env_name = provider.alias_key_env_name(alias);
1948    std::env::var(env_name)
1949        .ok()
1950        .map(|v| v.trim().to_string())
1951        .filter(|v| !v.is_empty())
1952}
1953
1954fn normalize_alias_token(alias: &str) -> String {
1955    let mut out = String::with_capacity(alias.len());
1956    for character in alias.chars() {
1957        if character.is_ascii_alphanumeric() {
1958            out.push(character.to_ascii_uppercase());
1959        } else {
1960            out.push('_');
1961        }
1962    }
1963    while out.contains("__") {
1964        out = out.replace("__", "_");
1965    }
1966    out.trim_matches('_').to_string()
1967}
1968
1969/// Convenience: resolve API key using a RedDBRuntime's KV store.
1970pub fn resolve_api_key_from_runtime(
1971    provider: &AiProvider,
1972    credential_alias: Option<&str>,
1973    runtime: &crate::runtime::RedDBRuntime,
1974) -> crate::RedDBResult<String> {
1975    use crate::application::ports::RuntimeEntityPort;
1976    resolve_api_key(provider, credential_alias, |kv_key| {
1977        if kv_key.starts_with("red.secret.") {
1978            return Ok(runtime.vault_kv_get(kv_key));
1979        }
1980        match runtime.get_kv("red_config", kv_key)? {
1981            Some((crate::storage::schema::Value::Text(secret), _)) => Ok(Some(secret.to_string())),
1982            Some(_) => Ok(None),
1983            None => Ok(None),
1984        }
1985    })
1986}
1987
1988// ============================================================================
1989// HuggingFace Inference API
1990// ============================================================================
1991
1992/// Generate embeddings via HuggingFace Inference API.
1993pub fn huggingface_embeddings(
1994    api_key: &str,
1995    model: &str,
1996    inputs: &[String],
1997    api_base: &str,
1998) -> crate::RedDBResult<OpenAiEmbeddingResponse> {
1999    let url = format!("{api_base}/pipeline/feature-extraction/{model}");
2000    let mut embeddings = Vec::with_capacity(inputs.len());
2001
2002    for input in inputs {
2003        let payload = crate::serde_json::json!({ "inputs": input }).to_string_compact();
2004        let (status, body_str) = http_post_json(&url, api_key, &[], payload, 90)
2005            .map_err(|e| crate::RedDBError::Query(format!("HuggingFace API error: {e}")))?;
2006        if !(200..300).contains(&status) {
2007            return Err(crate::RedDBError::Query(format!(
2008                "HuggingFace API error (status {status}): {body_str}"
2009            )));
2010        }
2011        let body: JsonValue = crate::serde_json::from_str(&body_str).map_err(|e| {
2012            crate::RedDBError::Query(format!("HuggingFace response parse error: {e}"))
2013        })?;
2014
2015        // HF returns [[f32, ...]] for single input
2016        let vector: Vec<f32> = match &body {
2017            JsonValue::Array(outer) => outer
2018                .iter()
2019                .filter_map(|v| v.as_f64().map(|n| n as f32))
2020                .collect(),
2021            _ => {
2022                return Err(crate::RedDBError::Query(
2023                    "unexpected HuggingFace embedding response format".to_string(),
2024                ))
2025            }
2026        };
2027        embeddings.push(vector);
2028    }
2029
2030    Ok(OpenAiEmbeddingResponse {
2031        provider: "huggingface",
2032        model: model.to_string(),
2033        embeddings,
2034        prompt_tokens: None,
2035        total_tokens: None,
2036    })
2037}
2038
2039/// Generate text via HuggingFace Inference API.
2040pub fn huggingface_prompt(
2041    api_key: &str,
2042    model: &str,
2043    prompt: &str,
2044    temperature: Option<f32>,
2045    max_tokens: Option<usize>,
2046    api_base: &str,
2047) -> crate::RedDBResult<AiPromptResponse> {
2048    let url = format!("{api_base}/models/{model}");
2049    let mut params = Map::new();
2050    if let Some(t) = temperature {
2051        params.insert("temperature".into(), JsonValue::Number(t as f64));
2052    }
2053    params.insert(
2054        "max_new_tokens".into(),
2055        JsonValue::Number(max_tokens.unwrap_or(512) as f64),
2056    );
2057    let payload = crate::serde_json::json!({
2058        "inputs": prompt,
2059        "parameters": JsonValue::Object(params)
2060    });
2061
2062    let (status, body_str) =
2063        http_post_json(&url, api_key, &[], payload.to_string_compact(), 120)
2064            .map_err(|e| crate::RedDBError::Query(format!("HuggingFace API error: {e}")))?;
2065    if !(200..300).contains(&status) {
2066        return Err(crate::RedDBError::Query(format!(
2067            "HuggingFace API error (status {status}): {body_str}"
2068        )));
2069    }
2070    let body: JsonValue = crate::serde_json::from_str(&body_str)
2071        .map_err(|e| crate::RedDBError::Query(format!("HuggingFace response parse error: {e}")))?;
2072
2073    let output_text = match &body {
2074        JsonValue::Array(arr) => arr
2075            .first()
2076            .and_then(|v| v.get("generated_text"))
2077            .and_then(JsonValue::as_str)
2078            .unwrap_or("")
2079            .to_string(),
2080        _ => body
2081            .get("generated_text")
2082            .and_then(JsonValue::as_str)
2083            .unwrap_or("")
2084            .to_string(),
2085    };
2086
2087    Ok(AiPromptResponse {
2088        provider: "huggingface",
2089        model: model.to_string(),
2090        output_text,
2091        output_chunks: None,
2092        prompt_tokens: None,
2093        completion_tokens: None,
2094        total_tokens: None,
2095        stop_reason: None,
2096    })
2097}
2098
2099// ============================================================================
2100// Local model stubs (requires 'local-models' feature flag)
2101// ============================================================================
2102
2103/// Local embedding via candle — requires `local-models` feature.
2104pub fn local_embeddings(
2105    _model_id: &str,
2106    _texts: &[String],
2107) -> crate::RedDBResult<OpenAiEmbeddingResponse> {
2108    Err(crate::RedDBError::FeatureNotEnabled(
2109        "local model inference requires the 'local-models' feature flag. \
2110         Build with: cargo build --features local-models. \
2111         Alternatively, use 'ollama' provider with a local Ollama server."
2112            .to_string(),
2113    ))
2114}
2115
2116/// Local prompt via candle — requires `local-models` feature.
2117pub fn local_prompt(_model_id: &str, _prompt: &str) -> crate::RedDBResult<AiPromptResponse> {
2118    Err(crate::RedDBError::FeatureNotEnabled(
2119        "local model inference requires the 'local-models' feature flag. \
2120         Build with: cargo build --features local-models. \
2121         Alternatively, use 'ollama' provider with a local Ollama server."
2122            .to_string(),
2123    ))
2124}
2125
2126// ============================================================================
2127// gRPC input collection — parity with HTTP /ai/embeddings
2128// ============================================================================
2129
2130/// Collect embedding inputs from any of the three supported shapes.
2131///
2132/// * `input: "..."` — single string.
2133/// * `inputs: ["...", ...]` — array of strings.
2134/// * `source_query: "SELECT ..."` — runs a SQL query and projects
2135///   either the named `source_field` from each row (source_mode =
2136///   "row", default) or every string cell of every result row
2137///   (source_mode = "result").
2138fn grpc_collect_embedding_inputs(
2139    runtime: &crate::runtime::RedDBRuntime,
2140    payload: &JsonValue,
2141) -> crate::RedDBResult<Vec<String>> {
2142    if let Some(source_query) = payload
2143        .get("source_query")
2144        .and_then(|v| v.as_str())
2145        .map(str::trim)
2146        .filter(|s| !s.is_empty())
2147    {
2148        return grpc_collect_inputs_from_source_query(runtime, payload, source_query);
2149    }
2150
2151    if let Some(arr) = payload.get("inputs").and_then(|v| v.as_array()) {
2152        let mut out = Vec::with_capacity(arr.len());
2153        for (idx, v) in arr.iter().enumerate() {
2154            let text = v.as_str().ok_or_else(|| {
2155                crate::RedDBError::Query(format!("field 'inputs[{idx}]' must be a string"))
2156            })?;
2157            if text.trim().is_empty() {
2158                return Err(crate::RedDBError::Query(format!(
2159                    "field 'inputs[{idx}]' cannot be empty"
2160                )));
2161            }
2162            out.push(text.to_string());
2163        }
2164        if out.is_empty() {
2165            return Err(crate::RedDBError::Query(
2166                "field 'inputs' must be a non-empty array of strings".to_string(),
2167            ));
2168        }
2169        return Ok(out);
2170    }
2171
2172    if let Some(single) = payload
2173        .get("input")
2174        .and_then(|v| v.as_str())
2175        .map(str::trim)
2176        .filter(|s| !s.is_empty())
2177    {
2178        return Ok(vec![single.to_string()]);
2179    }
2180
2181    Err(crate::RedDBError::Query(
2182        "provide either 'input', 'inputs', or 'source_query'".to_string(),
2183    ))
2184}
2185
2186fn grpc_collect_inputs_from_source_query(
2187    runtime: &crate::runtime::RedDBRuntime,
2188    payload: &JsonValue,
2189    source_query: &str,
2190) -> crate::RedDBResult<Vec<String>> {
2191    let result = runtime
2192        .execute_query(source_query)
2193        .map_err(|err| crate::RedDBError::Query(format!("source_query failed: {err}")))?;
2194
2195    let source_mode = payload
2196        .get("source_mode")
2197        .and_then(|v| v.as_str())
2198        .map(str::trim)
2199        .filter(|s| !s.is_empty())
2200        .unwrap_or("row")
2201        .to_ascii_lowercase();
2202
2203    let mut out: Vec<String> = Vec::new();
2204    match source_mode.as_str() {
2205        "row" => {
2206            let field = payload
2207                .get("source_field")
2208                .and_then(|v| v.as_str())
2209                .map(str::trim)
2210                .filter(|s| !s.is_empty())
2211                .ok_or_else(|| {
2212                    crate::RedDBError::Query(
2213                        "field 'source_field' is required when source_mode='row'".to_string(),
2214                    )
2215                })?;
2216            for rec in &result.result.records {
2217                for (key, value) in rec.iter_fields() {
2218                    if key.as_ref() == field {
2219                        if let crate::storage::schema::Value::Text(text) = value {
2220                            let trimmed = text.trim();
2221                            if !trimmed.is_empty() {
2222                                out.push(trimmed.to_string());
2223                            }
2224                        }
2225                    }
2226                }
2227            }
2228        }
2229        "result" => {
2230            for rec in &result.result.records {
2231                for (_, value) in rec.iter_fields() {
2232                    if let crate::storage::schema::Value::Text(text) = value {
2233                        let trimmed = text.trim();
2234                        if !trimmed.is_empty() {
2235                            out.push(trimmed.to_string());
2236                        }
2237                    }
2238                }
2239            }
2240        }
2241        other => {
2242            return Err(crate::RedDBError::Query(format!(
2243                "field 'source_mode' must be 'row' or 'result' (got '{other}')"
2244            )));
2245        }
2246    }
2247
2248    if out.is_empty() {
2249        return Err(crate::RedDBError::Query(
2250            "source_query produced zero non-empty text inputs".to_string(),
2251        ));
2252    }
2253    Ok(out)
2254}
2255
2256// ============================================================================
2257// gRPC stubs — delegate to the same logic as HTTP handlers
2258// ============================================================================
2259
2260/// gRPC embeddings — shared entrypoint that mirrors the HTTP handler.
2261///
2262/// Accepts the same JSON payload shape as `POST /ai/embeddings`:
2263///
2264/// ```json
2265/// { "provider": "openai", "model": "text-embedding-3-small",
2266///   "inputs": ["hello", "world"], "credential": "optional-alias" }
2267/// ```
2268///
2269/// Input shapes at parity with HTTP: `input` (single string),
2270/// `inputs` (array of strings), and `source_query` (SQL that the
2271/// runtime executes to materialise the input texts; `source_mode`
2272/// = `row` needs `source_collection` + `source_field`, `result`
2273/// uses the projected columns). Returns a JSON object with
2274/// `provider`, `model`, `embeddings`, `prompt_tokens`,
2275/// `total_tokens`. Non-OpenAI-compatible providers are rejected
2276/// with a clear message, matching the HTTP handler.
2277pub fn grpc_embeddings(
2278    runtime: &crate::runtime::RedDBRuntime,
2279    payload: &JsonValue,
2280) -> crate::RedDBResult<JsonValue> {
2281    let provider_name = payload
2282        .get("provider")
2283        .and_then(|v| v.as_str())
2284        .map(str::trim)
2285        .filter(|s| !s.is_empty())
2286        .unwrap_or("openai");
2287    let provider = parse_provider(provider_name)?;
2288    // Routing matrix mirrors `handle_ai_embeddings`. See that function
2289    // for the rationale; in short: HuggingFace gets its own wire
2290    // shape, Anthropic fails fast (no embeddings product), and Local
2291    // requires a build-time feature flag.
2292    match &provider {
2293        AiProvider::Anthropic => {
2294            return Err(crate::RedDBError::Query(
2295                "Anthropic does not offer an embeddings API. \
2296                 Re-issue the request against an OpenAI-compatible \
2297                 provider (openai, groq, ollama, openrouter, together, \
2298                 venice, deepseek), HuggingFace, or a custom base URL — \
2299                 RedDB does not silently route embeddings to a \
2300                 different provider than the one you named."
2301                    .to_string(),
2302            ));
2303        }
2304        AiProvider::Local => {
2305            return Err(crate::RedDBError::Query(
2306                "Local embeddings require the `local-models` feature \
2307                 flag at engine build time."
2308                    .to_string(),
2309            ));
2310        }
2311        _ => {}
2312    }
2313
2314    let inputs: Vec<String> = grpc_collect_embedding_inputs(runtime, payload)?;
2315
2316    let model = payload
2317        .get("model")
2318        .and_then(|v| v.as_str())
2319        .map(str::trim)
2320        .filter(|s| !s.is_empty())
2321        .map(str::to_string)
2322        .or_else(|| {
2323            std::env::var(format!(
2324                "REDDB_{}_EMBEDDING_MODEL",
2325                provider.token().to_ascii_uppercase()
2326            ))
2327            .ok()
2328        })
2329        .or_else(|| std::env::var("REDDB_OPENAI_EMBEDDING_MODEL").ok())
2330        .filter(|v| !v.trim().is_empty())
2331        .unwrap_or_else(|| provider.default_embedding_model().to_string());
2332
2333    let credential = payload
2334        .get("credential")
2335        .and_then(|v| v.as_str())
2336        .map(str::to_string);
2337    let api_key = resolve_api_key_from_runtime(&provider, credential.as_deref(), runtime)?;
2338
2339    let dimensions = payload
2340        .get("dimensions")
2341        .and_then(|v| v.as_i64())
2342        .and_then(|v| usize::try_from(v).ok())
2343        .filter(|v| *v > 0);
2344
2345    let response = match &provider {
2346        AiProvider::HuggingFace => {
2347            huggingface_embeddings(&api_key, &model, &inputs, &provider.resolve_api_base())?
2348        }
2349        _ => {
2350            let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
2351            let request = OpenAiEmbeddingRequest {
2352                api_key,
2353                model,
2354                inputs,
2355                dimensions,
2356                api_base: provider.resolve_api_base(),
2357            };
2358            crate::runtime::ai::block_on_ai(async move {
2359                openai_embeddings_async(&transport, request).await
2360            })
2361            .and_then(|result| result)?
2362        }
2363    };
2364
2365    let embeddings_json: Vec<JsonValue> = response
2366        .embeddings
2367        .into_iter()
2368        .map(|vec| {
2369            JsonValue::Array(
2370                vec.into_iter()
2371                    .map(|f| JsonValue::Number(f as f64))
2372                    .collect(),
2373            )
2374        })
2375        .collect();
2376
2377    let mut obj = Map::new();
2378    obj.insert(
2379        "provider".to_string(),
2380        JsonValue::String(response.provider.to_string()),
2381    );
2382    obj.insert("model".to_string(), JsonValue::String(response.model));
2383    obj.insert("embeddings".to_string(), JsonValue::Array(embeddings_json));
2384    if let Some(pt) = response.prompt_tokens {
2385        obj.insert("prompt_tokens".to_string(), JsonValue::Number(pt as f64));
2386    }
2387    if let Some(tt) = response.total_tokens {
2388        obj.insert("total_tokens".to_string(), JsonValue::Number(tt as f64));
2389    }
2390    Ok(JsonValue::Object(obj))
2391}
2392
2393/// gRPC stub for AI prompt.
2394pub fn grpc_prompt(
2395    _runtime: &crate::runtime::RedDBRuntime,
2396    _payload: &JsonValue,
2397) -> crate::RedDBResult<JsonValue> {
2398    Err(crate::RedDBError::FeatureNotEnabled(
2399        "AI prompt via gRPC requires HTTP endpoint; use POST /ai/prompt".to_string(),
2400    ))
2401}
2402
2403/// gRPC stub for AI credentials.
2404pub fn grpc_credentials(
2405    _runtime: &crate::runtime::RedDBRuntime,
2406    _payload: &JsonValue,
2407) -> crate::RedDBResult<JsonValue> {
2408    Err(crate::RedDBError::FeatureNotEnabled(
2409        "AI credentials via gRPC requires HTTP endpoint; use POST /ai/credentials".to_string(),
2410    ))
2411}
2412
2413// ============================================================================
2414// Generic OpenAI-compatible client (issue gh-516)
2415//
2416// Thin blocking client that targets any `{api_base}/chat/completions`
2417// and `{api_base}/embeddings` endpoint with arbitrary auth headers.
2418// Existing vendor-native paths (`openai_prompt_async`,
2419// `anthropic_prompt_async`) remain unchanged; this exists so callers
2420// can talk to non-OpenAI providers that expose an OpenAI-compatible
2421// surface (Groq, OpenRouter, Together, Ollama, vLLM, LM Studio, ...)
2422// without having to register a new `AiProvider` variant.
2423// ============================================================================
2424
2425/// Normalized usage block. Field names follow the Anthropic shape
2426/// (`input_tokens` / `output_tokens`) so downstream cost-accounting
2427/// has one canonical schema regardless of the upstream provider.
2428#[derive(Debug, Clone, Default, PartialEq, Eq)]
2429pub struct OpenAiCompatUsage {
2430    pub input_tokens: Option<u64>,
2431    pub output_tokens: Option<u64>,
2432    pub total_tokens: Option<u64>,
2433}
2434
2435#[derive(Debug, Clone)]
2436pub struct OpenAiCompatChatRequest {
2437    pub api_base: String,
2438    pub api_key: String,
2439    pub model: String,
2440    pub prompt: String,
2441    pub temperature: Option<f32>,
2442    pub seed: Option<u64>,
2443    pub max_output_tokens: Option<usize>,
2444    pub extra_headers: Vec<(String, String)>,
2445}
2446
2447#[derive(Debug, Clone)]
2448pub struct OpenAiCompatChatResponse {
2449    pub model: String,
2450    pub output_text: String,
2451    pub stop_reason: Option<String>,
2452    pub usage: OpenAiCompatUsage,
2453}
2454
2455#[derive(Debug, Clone)]
2456pub struct OpenAiCompatEmbeddingsRequest {
2457    pub api_base: String,
2458    pub api_key: String,
2459    pub model: String,
2460    pub inputs: Vec<String>,
2461    pub dimensions: Option<usize>,
2462    pub extra_headers: Vec<(String, String)>,
2463}
2464
2465#[derive(Debug, Clone)]
2466pub struct OpenAiCompatEmbeddingsResponse {
2467    pub model: String,
2468    pub embeddings: Vec<Vec<f32>>,
2469    pub usage: OpenAiCompatUsage,
2470}
2471
2472fn extra_header_refs(headers: &[(String, String)]) -> Vec<(&str, &str)> {
2473    headers
2474        .iter()
2475        .map(|(k, v)| (k.as_str(), v.as_str()))
2476        .collect()
2477}
2478
2479/// POST `{api_base}/chat/completions` and return a normalized response.
2480///
2481/// Errors:
2482/// * empty model / prompt → `RedDBError::Query`.
2483/// * transport / non-2xx → `RedDBError::Query` carrying the status code
2484///   and the provider's parsed `error.message` when available, raw body
2485///   otherwise.
2486pub fn openai_compat_chat(
2487    request: OpenAiCompatChatRequest,
2488) -> RedDBResult<OpenAiCompatChatResponse> {
2489    if request.model.trim().is_empty() {
2490        return Err(RedDBError::Query(
2491            "openai-compat: model cannot be empty".to_string(),
2492        ));
2493    }
2494    if request.prompt.trim().is_empty() {
2495        return Err(RedDBError::Query(
2496            "openai-compat: prompt cannot be empty".to_string(),
2497        ));
2498    }
2499
2500    let url = format!(
2501        "{}/chat/completions",
2502        request.api_base.trim_end_matches('/')
2503    );
2504    let payload = build_openai_prompt_payload(
2505        &request.model,
2506        &request.prompt,
2507        request.temperature,
2508        request.seed,
2509        request.max_output_tokens,
2510        false,
2511    );
2512
2513    let extra = extra_header_refs(&request.extra_headers);
2514    let (status, body) = http_post_json(&url, &request.api_key, &extra, payload, 120)
2515        .map_err(|err| RedDBError::Query(format!("openai-compat transport error: {err}")))?;
2516
2517    if !(200..300).contains(&status) {
2518        let message = openai_error_message(&body).unwrap_or_else(|| {
2519            if body.trim().is_empty() {
2520                "openai-compat chat request failed".to_string()
2521            } else {
2522                body.clone()
2523            }
2524        });
2525        return Err(RedDBError::Query(format!(
2526            "openai-compat chat request failed (status {status}): {message}"
2527        )));
2528    }
2529
2530    let parsed = parse_openai_prompt_response(&body, &request.model)?;
2531    Ok(OpenAiCompatChatResponse {
2532        model: parsed.model,
2533        output_text: parsed.output_text,
2534        stop_reason: parsed.stop_reason,
2535        usage: OpenAiCompatUsage {
2536            input_tokens: parsed.prompt_tokens,
2537            output_tokens: parsed.completion_tokens,
2538            total_tokens: parsed.total_tokens,
2539        },
2540    })
2541}
2542
2543/// POST `{api_base}/embeddings` and return a normalized response.
2544pub fn openai_compat_embeddings(
2545    request: OpenAiCompatEmbeddingsRequest,
2546) -> RedDBResult<OpenAiCompatEmbeddingsResponse> {
2547    if request.model.trim().is_empty() {
2548        return Err(RedDBError::Query(
2549            "openai-compat: embedding model cannot be empty".to_string(),
2550        ));
2551    }
2552    if request.inputs.is_empty() {
2553        return Err(RedDBError::Query(
2554            "openai-compat: at least one input is required".to_string(),
2555        ));
2556    }
2557
2558    let url = format!("{}/embeddings", request.api_base.trim_end_matches('/'));
2559    let payload =
2560        build_openai_embedding_payload(&request.model, &request.inputs, request.dimensions);
2561
2562    let extra = extra_header_refs(&request.extra_headers);
2563    let (status, body) = http_post_json(&url, &request.api_key, &extra, payload, 90)
2564        .map_err(|err| RedDBError::Query(format!("openai-compat transport error: {err}")))?;
2565
2566    if !(200..300).contains(&status) {
2567        let message = openai_error_message(&body).unwrap_or_else(|| {
2568            if body.trim().is_empty() {
2569                "openai-compat embeddings request failed".to_string()
2570            } else {
2571                body.clone()
2572            }
2573        });
2574        return Err(RedDBError::Query(format!(
2575            "openai-compat embeddings request failed (status {status}): {message}"
2576        )));
2577    }
2578
2579    let parsed = parse_openai_embedding_response(&body)?;
2580    Ok(OpenAiCompatEmbeddingsResponse {
2581        model: parsed.model,
2582        embeddings: parsed.embeddings,
2583        usage: OpenAiCompatUsage {
2584            input_tokens: parsed.prompt_tokens,
2585            output_tokens: None,
2586            total_tokens: parsed.total_tokens,
2587        },
2588    })
2589}
2590
2591// ============================================================================
2592// Provider mode selector (issue gh-516)
2593//
2594// `red.config.ai.provider` picks the wire-protocol family that engine
2595// consumers (currently AskPipeline) should use. This is intentionally
2596// distinct from `red.config.ai.default.provider`, which names a
2597// concrete vendor (openai, groq, ollama, ...). The mode selector
2598// answers the prior question of which HTTP shape to speak.
2599// ============================================================================
2600
2601/// Wire-protocol family used by engine-side AI consumers.
2602#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2603pub enum AiProviderMode {
2604    /// Generic OpenAI-compatible client (`POST {api_base}/chat/completions`).
2605    OpenAiCompat,
2606    /// Vendor-native OpenAI client (api.openai.com, default headers).
2607    OpenAiNative,
2608    /// Vendor-native Anthropic client (api.anthropic.com, x-api-key).
2609    AnthropicNative,
2610}
2611
2612impl AiProviderMode {
2613    pub fn token(&self) -> &'static str {
2614        match self {
2615            Self::OpenAiCompat => "openai-compat",
2616            Self::OpenAiNative => "openai-native",
2617            Self::AnthropicNative => "anthropic-native",
2618        }
2619    }
2620}
2621
2622/// Parse a mode token. Accepts hyphen or underscore spellings.
2623pub fn parse_provider_mode(name: &str) -> Option<AiProviderMode> {
2624    match name.trim().to_ascii_lowercase().as_str() {
2625        "openai-compat" | "openai_compat" | "openaicompat" => Some(AiProviderMode::OpenAiCompat),
2626        "openai-native" | "openai_native" | "openainative" => Some(AiProviderMode::OpenAiNative),
2627        "anthropic-native" | "anthropic_native" | "anthropicnative" => {
2628            Some(AiProviderMode::AnthropicNative)
2629        }
2630        _ => None,
2631    }
2632}
2633
2634/// Resolve the provider mode. Lookup chain:
2635/// 1. `REDDB_AI_PROVIDER_MODE` env var.
2636/// 2. `red_config` KV key `red.config.ai.provider`.
2637/// 3. Returns `None` so callers can fall back to their existing
2638///    vendor-based routing.
2639pub fn resolve_provider_mode<F>(kv_getter: &F) -> Option<AiProviderMode>
2640where
2641    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
2642{
2643    if let Ok(value) = std::env::var("REDDB_AI_PROVIDER_MODE") {
2644        if let Some(mode) = parse_provider_mode(&value) {
2645            return Some(mode);
2646        }
2647    }
2648    if let Ok(Some(value)) = kv_getter("red.config.ai.provider") {
2649        if let Some(mode) = parse_provider_mode(&value) {
2650            return Some(mode);
2651        }
2652    }
2653    None
2654}
2655
2656/// Map a mode to the matching [`AiProvider`] variant. `OpenAiCompat`
2657/// stays as a `Custom("")` marker — callers must resolve the actual
2658/// api_base separately (typically via `resolve_api_base_with_kv`).
2659pub fn provider_mode_to_provider(mode: AiProviderMode) -> AiProvider {
2660    match mode {
2661        AiProviderMode::OpenAiNative => AiProvider::OpenAi,
2662        AiProviderMode::AnthropicNative => AiProvider::Anthropic,
2663        AiProviderMode::OpenAiCompat => AiProvider::Custom(String::new()),
2664    }
2665}