Skip to main content

reddb_server/
ai.rs

1//! External AI provider integration primitives.
2//!
3//! This module currently supports OpenAI embeddings and is intended to be
4//! consumed by server handlers and future query/runtime integrations.
5
6use std::io::BufRead;
7use std::time::Duration;
8
9use crate::json::{parse_json, Map, Value as JsonValue};
10use crate::{RedDBError, RedDBResult};
11
12/// Shared HTTP helper for every outbound AI provider call. Centralises
13/// the ureq 3.x builder and error conversion. Returns
14/// `(status, body)` even on 4xx/5xx (via
15/// `http_status_as_error(false)`) so callers can format a
16/// provider-specific error without re-plumbing the 3.x error enum.
17fn http_post_json(
18    url: &str,
19    api_key: &str,
20    extra_headers: &[(&str, &str)],
21    payload: String,
22    read_timeout_secs: u64,
23) -> Result<(u16, String), String> {
24    let agent: ureq::Agent = ureq::Agent::config_builder()
25        .timeout_connect(Some(Duration::from_secs(10)))
26        .timeout_send_request(Some(Duration::from_secs(30)))
27        .timeout_recv_response(Some(Duration::from_secs(read_timeout_secs)))
28        .timeout_recv_body(Some(Duration::from_secs(read_timeout_secs)))
29        .http_status_as_error(false)
30        .build()
31        .into();
32
33    let mut req = agent
34        .post(url)
35        .header("Content-Type", "application/json")
36        .header("Accept", "application/json");
37    for (k, v) in extra_headers {
38        req = req.header(*k, *v);
39    }
40    let trimmed_key = api_key.trim();
41    if !trimmed_key.is_empty() {
42        req = req.header("Authorization", &format!("Bearer {}", trimmed_key));
43    }
44
45    match req.send(payload) {
46        Ok(mut resp) => {
47            let status = resp.status().as_u16();
48            let body = resp
49                .body_mut()
50                .read_to_string()
51                .map_err(|err| format!("failed to read response body: {err}"))?;
52            Ok((status, body))
53        }
54        Err(err) => Err(format!("{err}")),
55    }
56}
57
58pub const DEFAULT_OPENAI_EMBEDDING_MODEL: &str = "text-embedding-3-small";
59pub const DEFAULT_OPENAI_API_BASE: &str = "https://api.openai.com/v1";
60pub const DEFAULT_OPENAI_PROMPT_MODEL: &str = "gpt-4.1-mini";
61pub const DEFAULT_ANTHROPIC_PROMPT_MODEL: &str = "claude-3-5-haiku-latest";
62pub const DEFAULT_ANTHROPIC_API_BASE: &str = "https://api.anthropic.com/v1";
63pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01";
64
65#[derive(Debug, Clone)]
66pub struct OpenAiEmbeddingRequest {
67    pub api_key: String,
68    pub model: String,
69    pub inputs: Vec<String>,
70    pub dimensions: Option<usize>,
71    pub api_base: String,
72}
73
74#[derive(Debug, Clone)]
75pub struct OpenAiEmbeddingResponse {
76    pub provider: &'static str,
77    pub model: String,
78    pub embeddings: Vec<Vec<f32>>,
79    pub prompt_tokens: Option<u64>,
80    pub total_tokens: Option<u64>,
81}
82
83#[derive(Debug, Clone)]
84pub struct OpenAiPromptRequest {
85    pub api_key: String,
86    pub model: String,
87    pub prompt: String,
88    pub temperature: Option<f32>,
89    pub seed: Option<u64>,
90    pub max_output_tokens: Option<usize>,
91    pub api_base: String,
92    pub stream: bool,
93}
94
95#[derive(Debug, Clone)]
96pub struct AnthropicPromptRequest {
97    pub api_key: String,
98    pub model: String,
99    pub prompt: String,
100    pub temperature: Option<f32>,
101    pub max_output_tokens: Option<usize>,
102    pub api_base: String,
103    pub anthropic_version: String,
104}
105
106#[derive(Debug, Clone)]
107pub struct AiPromptResponse {
108    pub provider: &'static str,
109    pub model: String,
110    pub output_text: String,
111    pub output_chunks: Option<Vec<String>>,
112    pub prompt_tokens: Option<u64>,
113    pub completion_tokens: Option<u64>,
114    pub total_tokens: Option<u64>,
115    pub stop_reason: Option<String>,
116}
117
118#[deprecated(
119    since = "1.0.0",
120    note = "use AiBatchClient::embed_batch for embeddings or openai_embeddings_async with AiTransport when token usage metadata is required"
121)]
122pub fn openai_embeddings(request: OpenAiEmbeddingRequest) -> RedDBResult<OpenAiEmbeddingResponse> {
123    if request.model.trim().is_empty() {
124        return Err(RedDBError::Query(
125            "OpenAI embedding model cannot be empty".to_string(),
126        ));
127    }
128    if request.inputs.is_empty() {
129        return Err(RedDBError::Query(
130            "at least one input is required for embeddings".to_string(),
131        ));
132    }
133
134    let url = format!("{}/embeddings", request.api_base.trim_end_matches('/'));
135    let payload =
136        build_openai_embedding_payload(&request.model, &request.inputs, request.dimensions);
137
138    let (status, body) = http_post_json(&url, &request.api_key, &[], payload, 90)
139        .map_err(|err| RedDBError::Query(format!("OpenAI transport error: {err}")))?;
140
141    if !(200..300).contains(&status) {
142        let message = openai_error_message(&body)
143            .unwrap_or_else(|| "OpenAI embeddings request failed".to_string());
144        return Err(RedDBError::Query(format!(
145            "OpenAI embeddings request failed (status {status}): {message}"
146        )));
147    }
148
149    parse_openai_embedding_response(&body)
150}
151
152#[deprecated(since = "1.0.0", note = "use openai_prompt_async with AiTransport")]
153pub fn openai_prompt(request: OpenAiPromptRequest) -> RedDBResult<AiPromptResponse> {
154    if request.model.trim().is_empty() {
155        return Err(RedDBError::Query(
156            "OpenAI prompt model cannot be empty".to_string(),
157        ));
158    }
159    if request.prompt.trim().is_empty() {
160        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
161    }
162
163    let url = format!(
164        "{}/chat/completions",
165        request.api_base.trim_end_matches('/')
166    );
167    let payload = build_openai_prompt_payload(
168        &request.model,
169        &request.prompt,
170        request.temperature,
171        request.seed,
172        request.max_output_tokens,
173        false,
174    );
175
176    let (status, body) = http_post_json(&url, &request.api_key, &[], payload, 120)
177        .map_err(|err| RedDBError::Query(format!("OpenAI transport error: {err}")))?;
178
179    if !(200..300).contains(&status) {
180        let message = openai_error_message(&body)
181            .unwrap_or_else(|| "OpenAI prompt request failed".to_string());
182        return Err(RedDBError::Query(format!(
183            "OpenAI prompt request failed (status {status}): {message}"
184        )));
185    }
186
187    parse_openai_prompt_response(&body, &request.model)
188}
189
190#[deprecated(since = "1.0.0", note = "use anthropic_prompt_async with AiTransport")]
191pub fn anthropic_prompt(request: AnthropicPromptRequest) -> RedDBResult<AiPromptResponse> {
192    if request.api_key.trim().is_empty() {
193        return Err(RedDBError::Query(
194            "Anthropic API key cannot be empty".to_string(),
195        ));
196    }
197    if request.model.trim().is_empty() {
198        return Err(RedDBError::Query(
199            "Anthropic model cannot be empty".to_string(),
200        ));
201    }
202    if request.prompt.trim().is_empty() {
203        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
204    }
205
206    let url = format!("{}/messages", request.api_base.trim_end_matches('/'));
207    let payload = build_anthropic_prompt_payload(
208        &request.model,
209        &request.prompt,
210        request.temperature,
211        request.max_output_tokens,
212    );
213
214    // Anthropic uses its own `x-api-key` header instead of
215    // `Authorization: Bearer`, so skip the shared helper's default
216    // auth header path — we pass an empty API key and set
217    // `x-api-key` via extra_headers instead.
218    let extra = [
219        ("x-api-key", request.api_key.as_str()),
220        ("anthropic-version", request.anthropic_version.as_str()),
221    ];
222    let (status, body) = http_post_json(&url, "", &extra, payload, 120)
223        .map_err(|err| RedDBError::Query(format!("Anthropic transport error: {err}")))?;
224
225    if !(200..300).contains(&status) {
226        let message = anthropic_error_message(&body)
227            .unwrap_or_else(|| "Anthropic prompt request failed".to_string());
228        return Err(RedDBError::Query(format!(
229            "Anthropic prompt request failed (status {status}): {message}"
230        )));
231    }
232
233    parse_anthropic_prompt_response(&body, &request.model)
234}
235
236/// Async OpenAI-compatible embeddings via [`AiTransport`].
237///
238/// Uses the transport's connection pool and retry policy (429/5xx backoff)
239/// instead of the deprecated one-shot blocking path.
240pub async fn openai_embeddings_async(
241    transport: &crate::runtime::ai::transport::AiTransport,
242    request: OpenAiEmbeddingRequest,
243) -> RedDBResult<OpenAiEmbeddingResponse> {
244    if request.model.trim().is_empty() {
245        return Err(RedDBError::Query(
246            "OpenAI embedding model cannot be empty".to_string(),
247        ));
248    }
249    if request.inputs.is_empty() {
250        return Err(RedDBError::Query(
251            "at least one input is required for embeddings".to_string(),
252        ));
253    }
254
255    let url = format!("{}/embeddings", request.api_base.trim_end_matches('/'));
256    let payload =
257        build_openai_embedding_payload(&request.model, &request.inputs, request.dimensions);
258    let mut http_req =
259        crate::runtime::ai::transport::AiHttpRequest::post_json("openai-compatible", url, payload);
260    let trimmed_key = request.api_key.trim();
261    if !trimmed_key.is_empty() {
262        http_req = http_req.header("authorization", format!("Bearer {}", trimmed_key));
263    }
264
265    let response = transport
266        .request(http_req)
267        .await
268        .map_err(|e| RedDBError::Query(e.to_string()))?;
269
270    parse_openai_embedding_response(&response.body)
271}
272
273/// Async OpenAI chat-completion prompt via [`AiTransport`].
274///
275/// Uses the transport's connection pool and retry policy (429/5xx backoff)
276/// instead of the deprecated one-shot blocking path.
277pub async fn openai_prompt_async(
278    transport: &crate::runtime::ai::transport::AiTransport,
279    request: OpenAiPromptRequest,
280) -> RedDBResult<AiPromptResponse> {
281    if request.model.trim().is_empty() {
282        return Err(RedDBError::Query(
283            "OpenAI prompt model cannot be empty".to_string(),
284        ));
285    }
286    if request.prompt.trim().is_empty() {
287        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
288    }
289
290    let url = format!(
291        "{}/chat/completions",
292        request.api_base.trim_end_matches('/')
293    );
294    let payload = build_openai_prompt_payload(
295        &request.model,
296        &request.prompt,
297        request.temperature,
298        request.seed,
299        request.max_output_tokens,
300        request.stream,
301    );
302    let http_req = crate::runtime::ai::transport::AiHttpRequest::post_json("openai", url, payload)
303        .model(request.model.clone())
304        .header("authorization", format!("Bearer {}", request.api_key));
305
306    let response = transport
307        .request(http_req)
308        .await
309        .map_err(|e| RedDBError::Query(e.to_string()))?;
310
311    if request.stream {
312        parse_openai_streaming_prompt_response(&response.body, &request.model)
313    } else {
314        parse_openai_prompt_response(&response.body, &request.model)
315    }
316}
317
318/// Blocking OpenAI-compatible streaming prompt.
319///
320/// This is used by the socket-level `ASK ... STREAM` path so each provider
321/// `delta.content` can be forwarded to the HTTP client before the provider
322/// body has completed.
323pub fn openai_prompt_streaming(
324    request: OpenAiPromptRequest,
325    mut on_chunk: impl FnMut(&str) -> RedDBResult<()>,
326) -> RedDBResult<AiPromptResponse> {
327    if request.model.trim().is_empty() {
328        return Err(RedDBError::Query(
329            "OpenAI prompt model cannot be empty".to_string(),
330        ));
331    }
332    if request.prompt.trim().is_empty() {
333        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
334    }
335
336    let url = format!(
337        "{}/chat/completions",
338        request.api_base.trim_end_matches('/')
339    );
340    let payload = build_openai_prompt_payload(
341        &request.model,
342        &request.prompt,
343        request.temperature,
344        request.seed,
345        request.max_output_tokens,
346        true,
347    );
348
349    let agent: ureq::Agent = ureq::Agent::config_builder()
350        .timeout_connect(Some(Duration::from_secs(10)))
351        .timeout_send_request(Some(Duration::from_secs(30)))
352        .timeout_recv_response(Some(Duration::from_secs(120)))
353        .timeout_recv_body(Some(Duration::from_secs(120)))
354        .http_status_as_error(false)
355        .build()
356        .into();
357
358    let mut req = agent
359        .post(&url)
360        .header("content-type", "application/json")
361        .header("accept", "text/event-stream");
362    let trimmed_key = request.api_key.trim();
363    if !trimmed_key.is_empty() {
364        req = req.header("authorization", &format!("Bearer {}", trimmed_key));
365    }
366
367    let mut response = req
368        .send(payload)
369        .map_err(|err| RedDBError::Query(format!("OpenAI transport error: {err}")))?;
370    let status = response.status().as_u16();
371    if !(200..300).contains(&status) {
372        let body = response
373            .body_mut()
374            .read_to_string()
375            .unwrap_or_else(|err| format!("failed to read response body: {err}"));
376        let message = openai_error_message(&body)
377            .unwrap_or_else(|| "OpenAI prompt request failed".to_string());
378        return Err(RedDBError::Query(format!(
379            "OpenAI prompt request failed (status {status}): {message}"
380        )));
381    }
382
383    let mut model = request.model;
384    let mut chunks = Vec::new();
385    let mut prompt_tokens = None;
386    let mut completion_tokens = None;
387    let mut total_tokens = None;
388    let mut stop_reason = None;
389
390    let mut reader = std::io::BufReader::new(response.body_mut().as_reader());
391    let mut line = String::new();
392    loop {
393        line.clear();
394        let read = reader.read_line(&mut line).map_err(|err| {
395            RedDBError::Query(format!("failed to read OpenAI streaming response: {err}"))
396        })?;
397        if read == 0 {
398            break;
399        }
400
401        let trimmed = line.trim();
402        let Some(data) = trimmed.strip_prefix("data:") else {
403            continue;
404        };
405        let data = data.trim();
406        if data.is_empty() {
407            continue;
408        }
409        if data == "[DONE]" {
410            break;
411        }
412
413        let parsed = parse_json(data).map_err(|err| {
414            RedDBError::Query(format!(
415                "invalid OpenAI streaming prompt JSON response: {err}"
416            ))
417        })?;
418        let json = JsonValue::from(parsed);
419        if let Some(value) = json.get("model").and_then(JsonValue::as_str) {
420            model = value.to_string();
421        }
422        if let Some(usage) = json.get("usage") {
423            prompt_tokens = usage
424                .get("prompt_tokens")
425                .and_then(JsonValue::as_i64)
426                .and_then(|value| u64::try_from(value).ok())
427                .or(prompt_tokens);
428            completion_tokens = usage
429                .get("completion_tokens")
430                .and_then(JsonValue::as_i64)
431                .and_then(|value| u64::try_from(value).ok())
432                .or(completion_tokens);
433            total_tokens = usage
434                .get("total_tokens")
435                .and_then(JsonValue::as_i64)
436                .and_then(|value| u64::try_from(value).ok())
437                .or(total_tokens);
438        }
439
440        let Some(choices) = json.get("choices").and_then(JsonValue::as_array) else {
441            continue;
442        };
443        let Some(first_choice) = choices.first() else {
444            continue;
445        };
446        if let Some(reason) = first_choice
447            .get("finish_reason")
448            .and_then(JsonValue::as_str)
449        {
450            stop_reason = Some(reason.to_string());
451        }
452        if let Some(text) = first_choice
453            .get("delta")
454            .and_then(|delta| delta.get("content"))
455            .and_then(JsonValue::as_str)
456        {
457            if !text.is_empty() {
458                on_chunk(text)?;
459                chunks.push(text.to_string());
460            }
461        }
462    }
463
464    if chunks.is_empty() {
465        return Err(RedDBError::Query(
466            "OpenAI streaming prompt response missing text content".to_string(),
467        ));
468    }
469
470    let output_text = chunks.concat();
471    let total_tokens = total_tokens.or_else(|| match (prompt_tokens, completion_tokens) {
472        (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
473        _ => None,
474    });
475
476    Ok(AiPromptResponse {
477        provider: "openai",
478        model,
479        output_text,
480        output_chunks: Some(chunks),
481        prompt_tokens,
482        completion_tokens,
483        total_tokens,
484        stop_reason,
485    })
486}
487
488/// Async Anthropic messages-API prompt via [`AiTransport`].
489///
490/// Uses the transport's connection pool and retry policy (429/5xx backoff)
491/// instead of the deprecated one-shot blocking path.
492pub async fn anthropic_prompt_async(
493    transport: &crate::runtime::ai::transport::AiTransport,
494    request: AnthropicPromptRequest,
495) -> RedDBResult<AiPromptResponse> {
496    if request.api_key.trim().is_empty() {
497        return Err(RedDBError::Query(
498            "Anthropic API key cannot be empty".to_string(),
499        ));
500    }
501    if request.model.trim().is_empty() {
502        return Err(RedDBError::Query(
503            "Anthropic model cannot be empty".to_string(),
504        ));
505    }
506    if request.prompt.trim().is_empty() {
507        return Err(RedDBError::Query("prompt cannot be empty".to_string()));
508    }
509
510    let url = format!("{}/messages", request.api_base.trim_end_matches('/'));
511    let payload = build_anthropic_prompt_payload(
512        &request.model,
513        &request.prompt,
514        request.temperature,
515        request.max_output_tokens,
516    );
517    let http_req =
518        crate::runtime::ai::transport::AiHttpRequest::post_json("anthropic", url, payload)
519            .model(request.model.clone())
520            .header("x-api-key", request.api_key)
521            .header("anthropic-version", request.anthropic_version);
522
523    let response = transport
524        .request(http_req)
525        .await
526        .map_err(|e| RedDBError::Query(e.to_string()))?;
527
528    parse_anthropic_prompt_response(&response.body, &request.model)
529}
530
531/// Build an OpenAI-compatible embedding request payload.
532pub(crate) fn build_embedding_payload(model: &str, inputs: &[String]) -> String {
533    build_openai_embedding_payload(model, inputs, None)
534}
535
536/// Parse an OpenAI-compatible embedding response, returning only the vectors.
537pub(crate) fn parse_embedding_vectors(body: &str) -> Result<Vec<Vec<f32>>, String> {
538    parse_openai_embedding_response(body)
539        .map(|r| r.embeddings)
540        .map_err(|e| e.to_string())
541}
542
543pub(crate) fn parse_embedding_response(body: &str) -> Result<OpenAiEmbeddingResponse, String> {
544    parse_openai_embedding_response(body).map_err(|e| e.to_string())
545}
546
547fn build_openai_embedding_payload(
548    model: &str,
549    inputs: &[String],
550    dimensions: Option<usize>,
551) -> String {
552    let mut object = Map::new();
553    object.insert("model".to_string(), JsonValue::String(model.to_string()));
554    if inputs.len() == 1 {
555        object.insert("input".to_string(), JsonValue::String(inputs[0].clone()));
556    } else {
557        object.insert(
558            "input".to_string(),
559            JsonValue::Array(inputs.iter().cloned().map(JsonValue::String).collect()),
560        );
561    }
562    if let Some(dimensions) = dimensions {
563        object.insert(
564            "dimensions".to_string(),
565            JsonValue::Number(dimensions as f64),
566        );
567    }
568    object.insert(
569        "encoding_format".to_string(),
570        JsonValue::String("float".to_string()),
571    );
572    JsonValue::Object(object).to_string_compact()
573}
574
575fn openai_error_message(body: &str) -> Option<String> {
576    provider_error_message(body)
577}
578
579fn anthropic_error_message(body: &str) -> Option<String> {
580    provider_error_message(body)
581}
582
583fn provider_error_message(body: &str) -> Option<String> {
584    let parsed = parse_json(body).ok().map(JsonValue::from)?;
585    let error = parsed.get("error")?;
586    if let Some(message) = error.get("message").and_then(JsonValue::as_str) {
587        let trimmed = message.trim();
588        if !trimmed.is_empty() {
589            return Some(trimmed.to_string());
590        }
591    }
592    None
593}
594
595fn build_openai_prompt_payload(
596    model: &str,
597    prompt: &str,
598    temperature: Option<f32>,
599    seed: Option<u64>,
600    max_output_tokens: Option<usize>,
601    stream: bool,
602) -> String {
603    let mut object = Map::new();
604    object.insert("model".to_string(), JsonValue::String(model.to_string()));
605
606    let mut message = Map::new();
607    message.insert("role".to_string(), JsonValue::String("user".to_string()));
608    message.insert("content".to_string(), JsonValue::String(prompt.to_string()));
609    object.insert(
610        "messages".to_string(),
611        JsonValue::Array(vec![JsonValue::Object(message)]),
612    );
613
614    if let Some(temperature) = temperature {
615        object.insert(
616            "temperature".to_string(),
617            JsonValue::Number(temperature as f64),
618        );
619    }
620
621    if let Some(seed) = seed {
622        object.insert("seed".to_string(), JsonValue::Number(seed as f64));
623    }
624
625    if let Some(max_output_tokens) = max_output_tokens {
626        object.insert(
627            "max_tokens".to_string(),
628            JsonValue::Number(max_output_tokens as f64),
629        );
630    }
631
632    if stream {
633        object.insert("stream".to_string(), JsonValue::Bool(true));
634        let mut options = Map::new();
635        options.insert("include_usage".to_string(), JsonValue::Bool(true));
636        object.insert("stream_options".to_string(), JsonValue::Object(options));
637    }
638
639    JsonValue::Object(object).to_string_compact()
640}
641
642fn build_anthropic_prompt_payload(
643    model: &str,
644    prompt: &str,
645    temperature: Option<f32>,
646    max_output_tokens: Option<usize>,
647) -> String {
648    let mut object = Map::new();
649    object.insert("model".to_string(), JsonValue::String(model.to_string()));
650    object.insert(
651        "max_tokens".to_string(),
652        JsonValue::Number(max_output_tokens.unwrap_or(512) as f64),
653    );
654
655    let mut message = Map::new();
656    message.insert("role".to_string(), JsonValue::String("user".to_string()));
657    message.insert("content".to_string(), JsonValue::String(prompt.to_string()));
658    object.insert(
659        "messages".to_string(),
660        JsonValue::Array(vec![JsonValue::Object(message)]),
661    );
662
663    if let Some(temperature) = temperature {
664        object.insert(
665            "temperature".to_string(),
666            JsonValue::Number(temperature as f64),
667        );
668    }
669
670    JsonValue::Object(object).to_string_compact()
671}
672
673fn extract_text_from_parts(parts: &[JsonValue]) -> Option<String> {
674    let mut chunks = Vec::new();
675    for part in parts {
676        if let Some(text) = part.as_str() {
677            let trimmed = text.trim();
678            if !trimmed.is_empty() {
679                chunks.push(trimmed.to_string());
680            }
681            continue;
682        }
683
684        let Some(object) = part.as_object() else {
685            continue;
686        };
687        let Some(text) = object.get("text").and_then(JsonValue::as_str) else {
688            continue;
689        };
690        let trimmed = text.trim();
691        if !trimmed.is_empty() {
692            chunks.push(trimmed.to_string());
693        }
694    }
695
696    if chunks.is_empty() {
697        None
698    } else {
699        Some(chunks.join("\n\n"))
700    }
701}
702
703fn parse_openai_prompt_response(
704    body: &str,
705    requested_model: &str,
706) -> RedDBResult<AiPromptResponse> {
707    let parsed = parse_json(body)
708        .map_err(|err| RedDBError::Query(format!("invalid OpenAI prompt JSON response: {err}")))?;
709    let json = JsonValue::from(parsed);
710
711    let model = json
712        .get("model")
713        .and_then(JsonValue::as_str)
714        .unwrap_or(requested_model)
715        .to_string();
716
717    let Some(choices) = json.get("choices").and_then(JsonValue::as_array) else {
718        return Err(RedDBError::Query(
719            "OpenAI prompt response missing 'choices' array".to_string(),
720        ));
721    };
722    let Some(first_choice) = choices.first() else {
723        return Err(RedDBError::Query(
724            "OpenAI prompt response contains no choices".to_string(),
725        ));
726    };
727
728    let output_text = first_choice
729        .get("message")
730        .and_then(|message| {
731            if let Some(text) = message.get("content").and_then(JsonValue::as_str) {
732                let trimmed = text.trim();
733                if !trimmed.is_empty() {
734                    return Some(trimmed.to_string());
735                }
736            }
737            message
738                .get("content")
739                .and_then(JsonValue::as_array)
740                .and_then(extract_text_from_parts)
741        })
742        .ok_or_else(|| {
743            RedDBError::Query("OpenAI prompt response missing text content".to_string())
744        })?;
745
746    let prompt_tokens = json
747        .get("usage")
748        .and_then(|usage| usage.get("prompt_tokens"))
749        .and_then(JsonValue::as_i64)
750        .and_then(|value| u64::try_from(value).ok());
751    let completion_tokens = json
752        .get("usage")
753        .and_then(|usage| usage.get("completion_tokens"))
754        .and_then(JsonValue::as_i64)
755        .and_then(|value| u64::try_from(value).ok());
756    let total_tokens = json
757        .get("usage")
758        .and_then(|usage| usage.get("total_tokens"))
759        .and_then(JsonValue::as_i64)
760        .and_then(|value| u64::try_from(value).ok())
761        .or_else(|| match (prompt_tokens, completion_tokens) {
762            (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
763            _ => None,
764        });
765
766    let stop_reason = first_choice
767        .get("finish_reason")
768        .and_then(JsonValue::as_str)
769        .map(str::to_string);
770
771    Ok(AiPromptResponse {
772        provider: "openai",
773        model,
774        output_text,
775        output_chunks: None,
776        prompt_tokens,
777        completion_tokens,
778        total_tokens,
779        stop_reason,
780    })
781}
782
783fn parse_openai_streaming_prompt_response(
784    body: &str,
785    requested_model: &str,
786) -> RedDBResult<AiPromptResponse> {
787    let mut model = requested_model.to_string();
788    let mut chunks = Vec::new();
789    let mut prompt_tokens = None;
790    let mut completion_tokens = None;
791    let mut total_tokens = None;
792    let mut stop_reason = None;
793
794    for line in body.lines() {
795        let line = line.trim();
796        let Some(data) = line.strip_prefix("data:") else {
797            continue;
798        };
799        let data = data.trim();
800        if data.is_empty() {
801            continue;
802        }
803        if data == "[DONE]" {
804            break;
805        }
806
807        let parsed = parse_json(data).map_err(|err| {
808            RedDBError::Query(format!(
809                "invalid OpenAI streaming prompt JSON response: {err}"
810            ))
811        })?;
812        let json = JsonValue::from(parsed);
813        if let Some(value) = json.get("model").and_then(JsonValue::as_str) {
814            model = value.to_string();
815        }
816        if let Some(usage) = json.get("usage") {
817            prompt_tokens = usage
818                .get("prompt_tokens")
819                .and_then(JsonValue::as_i64)
820                .and_then(|value| u64::try_from(value).ok())
821                .or(prompt_tokens);
822            completion_tokens = usage
823                .get("completion_tokens")
824                .and_then(JsonValue::as_i64)
825                .and_then(|value| u64::try_from(value).ok())
826                .or(completion_tokens);
827            total_tokens = usage
828                .get("total_tokens")
829                .and_then(JsonValue::as_i64)
830                .and_then(|value| u64::try_from(value).ok())
831                .or(total_tokens);
832        }
833
834        let Some(choices) = json.get("choices").and_then(JsonValue::as_array) else {
835            continue;
836        };
837        let Some(first_choice) = choices.first() else {
838            continue;
839        };
840        if let Some(reason) = first_choice
841            .get("finish_reason")
842            .and_then(JsonValue::as_str)
843        {
844            stop_reason = Some(reason.to_string());
845        }
846        if let Some(text) = first_choice
847            .get("delta")
848            .and_then(|delta| delta.get("content"))
849            .and_then(JsonValue::as_str)
850        {
851            if !text.is_empty() {
852                chunks.push(text.to_string());
853            }
854        }
855    }
856
857    if chunks.is_empty() {
858        return Err(RedDBError::Query(
859            "OpenAI streaming prompt response missing text content".to_string(),
860        ));
861    }
862
863    let output_text = chunks.concat();
864    let total_tokens = total_tokens.or_else(|| match (prompt_tokens, completion_tokens) {
865        (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
866        _ => None,
867    });
868
869    Ok(AiPromptResponse {
870        provider: "openai",
871        model,
872        output_text,
873        output_chunks: Some(chunks),
874        prompt_tokens,
875        completion_tokens,
876        total_tokens,
877        stop_reason,
878    })
879}
880
881fn parse_anthropic_prompt_response(
882    body: &str,
883    requested_model: &str,
884) -> RedDBResult<AiPromptResponse> {
885    let parsed = parse_json(body).map_err(|err| {
886        RedDBError::Query(format!("invalid Anthropic prompt JSON response: {err}"))
887    })?;
888    let json = JsonValue::from(parsed);
889
890    let model = json
891        .get("model")
892        .and_then(JsonValue::as_str)
893        .unwrap_or(requested_model)
894        .to_string();
895
896    let Some(content_parts) = json.get("content").and_then(JsonValue::as_array) else {
897        return Err(RedDBError::Query(
898            "Anthropic prompt response missing 'content' array".to_string(),
899        ));
900    };
901
902    let output_text = extract_text_from_parts(content_parts).ok_or_else(|| {
903        RedDBError::Query("Anthropic prompt response missing text content".to_string())
904    })?;
905
906    let prompt_tokens = json
907        .get("usage")
908        .and_then(|usage| usage.get("input_tokens"))
909        .and_then(JsonValue::as_i64)
910        .and_then(|value| u64::try_from(value).ok());
911    let completion_tokens = json
912        .get("usage")
913        .and_then(|usage| usage.get("output_tokens"))
914        .and_then(JsonValue::as_i64)
915        .and_then(|value| u64::try_from(value).ok());
916    let total_tokens = match (prompt_tokens, completion_tokens) {
917        (Some(prompt), Some(completion)) => Some(prompt.saturating_add(completion)),
918        _ => None,
919    };
920
921    let stop_reason = json
922        .get("stop_reason")
923        .and_then(JsonValue::as_str)
924        .map(str::to_string);
925
926    Ok(AiPromptResponse {
927        provider: "anthropic",
928        model,
929        output_text,
930        output_chunks: None,
931        prompt_tokens,
932        completion_tokens,
933        total_tokens,
934        stop_reason,
935    })
936}
937
938fn parse_openai_embedding_response(body: &str) -> RedDBResult<OpenAiEmbeddingResponse> {
939    let parsed = parse_json(body).map_err(|err| {
940        RedDBError::Query(format!("invalid OpenAI embeddings JSON response: {err}"))
941    })?;
942    let json = JsonValue::from(parsed);
943
944    let model = json
945        .get("model")
946        .and_then(JsonValue::as_str)
947        .unwrap_or(DEFAULT_OPENAI_EMBEDDING_MODEL)
948        .to_string();
949
950    let Some(data) = json.get("data").and_then(JsonValue::as_array) else {
951        return Err(RedDBError::Query(
952            "OpenAI response missing 'data' array".to_string(),
953        ));
954    };
955
956    let mut rows: Vec<(usize, Vec<f32>)> = Vec::with_capacity(data.len());
957    for (position, item) in data.iter().enumerate() {
958        let index = item
959            .get("index")
960            .and_then(JsonValue::as_i64)
961            .and_then(|value| usize::try_from(value).ok())
962            .unwrap_or(position);
963
964        let Some(embedding_values) = item.get("embedding").and_then(JsonValue::as_array) else {
965            return Err(RedDBError::Query(
966                "OpenAI response contains item without 'embedding' array".to_string(),
967            ));
968        };
969        if embedding_values.is_empty() {
970            return Err(RedDBError::Query(
971                "OpenAI response contains empty embedding vector".to_string(),
972            ));
973        }
974
975        let mut embedding = Vec::with_capacity(embedding_values.len());
976        for value in embedding_values {
977            let Some(number) = value.as_f64() else {
978                return Err(RedDBError::Query(
979                    "OpenAI response contains non-numeric embedding value".to_string(),
980                ));
981            };
982            embedding.push(number as f32);
983        }
984        rows.push((index, embedding));
985    }
986    rows.sort_by_key(|(index, _)| *index);
987    let embeddings = rows.into_iter().map(|(_, embedding)| embedding).collect();
988
989    let prompt_tokens = json
990        .get("usage")
991        .and_then(|usage| usage.get("prompt_tokens"))
992        .and_then(JsonValue::as_i64)
993        .and_then(|value| u64::try_from(value).ok());
994    let total_tokens = json
995        .get("usage")
996        .and_then(|usage| usage.get("total_tokens"))
997        .and_then(JsonValue::as_i64)
998        .and_then(|value| u64::try_from(value).ok());
999
1000    Ok(OpenAiEmbeddingResponse {
1001        provider: "openai",
1002        model,
1003        embeddings,
1004        prompt_tokens,
1005        total_tokens,
1006    })
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::*;
1012
1013    #[test]
1014    fn parse_openai_embedding_response_extracts_vectors() {
1015        let body = r#"{
1016          "object":"list",
1017          "data":[
1018            {"object":"embedding","index":1,"embedding":[0.3,0.4]},
1019            {"object":"embedding","index":0,"embedding":[0.1,0.2]}
1020          ],
1021          "model":"text-embedding-3-small",
1022          "usage":{"prompt_tokens":12,"total_tokens":12}
1023        }"#;
1024
1025        let result = parse_openai_embedding_response(body).expect("response should parse");
1026        assert_eq!(result.provider, "openai");
1027        assert_eq!(result.model, "text-embedding-3-small");
1028        assert_eq!(result.embeddings.len(), 2);
1029        assert_eq!(result.embeddings[0], vec![0.1, 0.2]);
1030        assert_eq!(result.embeddings[1], vec![0.3, 0.4]);
1031        assert_eq!(result.prompt_tokens, Some(12));
1032        assert_eq!(result.total_tokens, Some(12));
1033    }
1034
1035    #[test]
1036    fn openai_error_message_extracts_nested_message() {
1037        let body = r#"{"error":{"message":"bad api key","type":"invalid_request_error"}}"#;
1038        assert_eq!(openai_error_message(body).as_deref(), Some("bad api key"));
1039    }
1040
1041    #[test]
1042    fn parse_openai_prompt_response_extracts_text_and_usage() {
1043        let body = r#"{
1044          "id":"chatcmpl_1",
1045          "object":"chat.completion",
1046          "model":"gpt-4.1-mini",
1047          "choices":[
1048            {
1049              "index":0,
1050              "finish_reason":"stop",
1051              "message":{"role":"assistant","content":"Resumo pronto."}
1052            }
1053          ],
1054          "usage":{"prompt_tokens":10,"completion_tokens":4,"total_tokens":14}
1055        }"#;
1056
1057        let parsed =
1058            parse_openai_prompt_response(body, DEFAULT_OPENAI_PROMPT_MODEL).expect("parse");
1059        assert_eq!(parsed.provider, "openai");
1060        assert_eq!(parsed.model, "gpt-4.1-mini");
1061        assert_eq!(parsed.output_text, "Resumo pronto.");
1062        assert_eq!(parsed.prompt_tokens, Some(10));
1063        assert_eq!(parsed.completion_tokens, Some(4));
1064        assert_eq!(parsed.total_tokens, Some(14));
1065        assert_eq!(parsed.stop_reason.as_deref(), Some("stop"));
1066    }
1067
1068    #[test]
1069    fn parse_anthropic_prompt_response_extracts_text_and_usage() {
1070        let body = r#"{
1071          "id":"msg_1",
1072          "model":"claude-3-5-haiku-latest",
1073          "type":"message",
1074          "content":[{"type":"text","text":"Action complete."}],
1075          "usage":{"input_tokens":11,"output_tokens":5},
1076          "stop_reason":"end_turn"
1077        }"#;
1078
1079        let parsed =
1080            parse_anthropic_prompt_response(body, DEFAULT_ANTHROPIC_PROMPT_MODEL).expect("parse");
1081        assert_eq!(parsed.provider, "anthropic");
1082        assert_eq!(parsed.model, "claude-3-5-haiku-latest");
1083        assert_eq!(parsed.output_text, "Action complete.");
1084        assert_eq!(parsed.prompt_tokens, Some(11));
1085        assert_eq!(parsed.completion_tokens, Some(5));
1086        assert_eq!(parsed.total_tokens, Some(16));
1087        assert_eq!(parsed.stop_reason.as_deref(), Some("end_turn"));
1088    }
1089
1090    #[test]
1091    fn resolve_api_key_prefers_vault_secret_over_legacy_config() {
1092        let provider = AiProvider::OpenAi;
1093        let alias = "vault_unit_alias";
1094        let secret_path = ai_api_secret_path(&provider, alias);
1095        let legacy_key = ai_api_legacy_config_key(&provider, alias);
1096
1097        let resolved = resolve_api_key(&provider, Some(alias), |key| {
1098            if key == secret_path {
1099                Ok(Some("vault-key".to_string()))
1100            } else if key == legacy_key {
1101                Ok(Some("legacy-key".to_string()))
1102            } else {
1103                Ok(None)
1104            }
1105        })
1106        .expect("resolve");
1107
1108        assert_eq!(resolved, "vault-key");
1109    }
1110
1111    #[test]
1112    fn resolve_api_key_uses_default_vault_secret_path() {
1113        let provider = AiProvider::OpenAi;
1114        let secret_path = ai_api_secret_path(&provider, "default");
1115
1116        let resolved = resolve_api_key(&provider, None, |key| {
1117            if key == secret_path {
1118                Ok(Some("default-vault-key".to_string()))
1119            } else {
1120                Ok(None)
1121            }
1122        })
1123        .expect("resolve");
1124
1125        assert_eq!(resolved, "default-vault-key");
1126    }
1127
1128    // Vault-first credential resolution with env fallback (issue #1270).
1129    // Each test uses a unique `Custom` provider token so the derived env
1130    // var name (`REDDB_<TOKEN>_API_KEY`) is process-unique and the tests
1131    // can set/unset env without racing other tests.
1132
1133    #[test]
1134    fn resolve_api_key_uses_env_when_no_vault_entry() {
1135        let provider = AiProvider::Custom("cred1270envonly".to_string());
1136        let env_name = provider.default_key_env_name();
1137        std::env::set_var(&env_name, "env-fallback-key");
1138
1139        // kv_getter returns nothing → no vault/legacy entry exists.
1140        let resolved = resolve_api_key(&provider, None, |_| Ok(None));
1141
1142        std::env::remove_var(&env_name);
1143        assert_eq!(resolved.expect("resolve"), "env-fallback-key");
1144    }
1145
1146    #[test]
1147    fn resolve_api_key_prefers_vault_over_env() {
1148        let provider = AiProvider::Custom("cred1270both".to_string());
1149        let env_name = provider.default_key_env_name();
1150        let secret_path = ai_api_secret_path(&provider, "default");
1151        std::env::set_var(&env_name, "env-fallback-key");
1152
1153        // Both the vault secret and the env var are set; vault wins.
1154        let resolved = resolve_api_key(&provider, None, |key| {
1155            if key == secret_path {
1156                Ok(Some("vault-managed-key".to_string()))
1157            } else {
1158                Ok(None)
1159            }
1160        });
1161
1162        std::env::remove_var(&env_name);
1163        assert_eq!(resolved.expect("resolve"), "vault-managed-key");
1164    }
1165
1166    #[test]
1167    fn resolve_api_key_alias_prefers_vault_over_env() {
1168        let provider = AiProvider::Custom("cred1270alias".to_string());
1169        let alias = "prod";
1170        let env_name = provider.alias_key_env_name(alias);
1171        let secret_path = ai_api_secret_path(&provider, alias);
1172        std::env::set_var(&env_name, "env-alias-key");
1173
1174        let resolved = resolve_api_key(&provider, Some(alias), |key| {
1175            if key == secret_path {
1176                Ok(Some("vault-alias-key".to_string()))
1177            } else {
1178                Ok(None)
1179            }
1180        });
1181
1182        std::env::remove_var(&env_name);
1183        assert_eq!(resolved.expect("resolve"), "vault-alias-key");
1184    }
1185
1186    #[test]
1187    fn resolve_api_key_alias_falls_back_to_env_without_vault() {
1188        let provider = AiProvider::Custom("cred1270aliasenv".to_string());
1189        let alias = "prod";
1190        let env_name = provider.alias_key_env_name(alias);
1191        std::env::set_var(&env_name, "env-alias-key");
1192
1193        let resolved = resolve_api_key(&provider, Some(alias), |_| Ok(None));
1194
1195        std::env::remove_var(&env_name);
1196        assert_eq!(resolved.expect("resolve"), "env-alias-key");
1197    }
1198
1199    #[test]
1200    fn openai_prompt_payload_includes_temperature_and_seed_when_present() {
1201        let payload = build_openai_prompt_payload(
1202            "gpt-4.1-mini",
1203            "hello",
1204            Some(0.0),
1205            Some(42),
1206            Some(128),
1207            false,
1208        );
1209        let parsed = JsonValue::from(parse_json(&payload).expect("valid json"));
1210
1211        assert_eq!(
1212            parsed.get("temperature").and_then(JsonValue::as_f64),
1213            Some(0.0)
1214        );
1215        assert_eq!(parsed.get("seed").and_then(JsonValue::as_u64), Some(42));
1216        assert_eq!(
1217            parsed.get("max_tokens").and_then(JsonValue::as_u64),
1218            Some(128)
1219        );
1220    }
1221
1222    #[test]
1223    fn openai_prompt_payload_omits_seed_when_none() {
1224        let payload =
1225            build_openai_prompt_payload("gpt-4.1-mini", "hello", Some(0.0), None, None, false);
1226        let parsed = JsonValue::from(parse_json(&payload).expect("valid json"));
1227
1228        assert!(parsed.get("seed").is_none());
1229        assert!(parsed.get("stream").is_none());
1230        assert_eq!(
1231            parsed.get("temperature").and_then(JsonValue::as_f64),
1232            Some(0.0)
1233        );
1234    }
1235
1236    #[test]
1237    fn openai_prompt_payload_enables_stream_options() {
1238        let payload =
1239            build_openai_prompt_payload("gpt-4.1-mini", "hello", Some(0.0), None, None, true);
1240        let parsed = JsonValue::from(parse_json(&payload).expect("valid json"));
1241
1242        assert_eq!(
1243            parsed.get("stream").and_then(JsonValue::as_bool),
1244            Some(true)
1245        );
1246        assert_eq!(
1247            parsed
1248                .get("stream_options")
1249                .and_then(|value| value.get("include_usage"))
1250                .and_then(JsonValue::as_bool),
1251            Some(true)
1252        );
1253    }
1254
1255    #[test]
1256    fn openai_streaming_prompt_response_collects_delta_chunks() {
1257        let body = concat!(
1258            "data: {\"model\":\"gpt-test\",\"choices\":[{\"delta\":{\"content\":\"login \"},\"finish_reason\":null}]}\n\n",
1259            "data: {\"model\":\"gpt-test\",\"choices\":[{\"delta\":{\"content\":\"failed\"},\"finish_reason\":null}]}\n\n",
1260            "data: {\"model\":\"gpt-test\",\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":12,\"completion_tokens\":2,\"total_tokens\":14}}\n\n",
1261            "data: [DONE]\n\n",
1262        );
1263        let parsed = parse_openai_streaming_prompt_response(body, "fallback").unwrap();
1264
1265        assert_eq!(parsed.model, "gpt-test");
1266        assert_eq!(parsed.output_text, "login failed");
1267        assert_eq!(
1268            parsed.output_chunks.as_deref(),
1269            Some(["login ".to_string(), "failed".to_string()].as_slice())
1270        );
1271        assert_eq!(parsed.prompt_tokens, Some(12));
1272        assert_eq!(parsed.completion_tokens, Some(2));
1273        assert_eq!(parsed.total_tokens, Some(14));
1274        assert_eq!(parsed.stop_reason.as_deref(), Some("stop"));
1275    }
1276
1277    #[tokio::test]
1278    async fn openai_prompt_async_rejects_empty_model() {
1279        let transport = crate::runtime::ai::transport::AiTransport::new(Default::default());
1280        let request = OpenAiPromptRequest {
1281            api_key: "key".to_string(),
1282            model: "  ".to_string(),
1283            prompt: "hello".to_string(),
1284            temperature: None,
1285            seed: None,
1286            max_output_tokens: None,
1287            api_base: "https://api.openai.com/v1".to_string(),
1288            stream: false,
1289        };
1290        let err = openai_prompt_async(&transport, request).await.unwrap_err();
1291        assert!(err.to_string().contains("model cannot be empty"));
1292    }
1293
1294    #[tokio::test]
1295    async fn openai_prompt_async_rejects_empty_prompt() {
1296        let transport = crate::runtime::ai::transport::AiTransport::new(Default::default());
1297        let request = OpenAiPromptRequest {
1298            api_key: "key".to_string(),
1299            model: "gpt-4.1-mini".to_string(),
1300            prompt: "".to_string(),
1301            temperature: None,
1302            seed: None,
1303            max_output_tokens: None,
1304            api_base: "https://api.openai.com/v1".to_string(),
1305            stream: false,
1306        };
1307        let err = openai_prompt_async(&transport, request).await.unwrap_err();
1308        assert!(err.to_string().contains("prompt cannot be empty"));
1309    }
1310
1311    // ========================================================================
1312    // openai-compat client tests (issue gh-516)
1313    //
1314    // Each test spins up a tiny TCP server, hands its base URL to the
1315    // new generic client, and asserts on the captured request +
1316    // synthesised response. Tests run in parallel-safe fashion (each
1317    // server binds to port 0).
1318    // ========================================================================
1319
1320    use std::io::{Read as _, Write as _};
1321    use std::net::TcpListener;
1322    use std::sync::{Arc, Mutex};
1323    use std::thread;
1324
1325    struct CapturedRequest {
1326        method: String,
1327        path: String,
1328        headers: Vec<(String, String)>,
1329        body: String,
1330    }
1331
1332    fn parse_http_request(stream: &mut std::net::TcpStream) -> CapturedRequest {
1333        let mut buf = [0u8; 8192];
1334        let mut data = Vec::new();
1335        loop {
1336            let read = stream.read(&mut buf).unwrap_or(0);
1337            if read == 0 {
1338                break;
1339            }
1340            data.extend_from_slice(&buf[..read]);
1341            if let Some(idx) = data.windows(4).position(|w| w == b"\r\n\r\n") {
1342                let header_len = idx + 4;
1343                let header_str = String::from_utf8_lossy(&data[..idx]).to_string();
1344                let mut lines = header_str.split("\r\n");
1345                let request_line = lines.next().unwrap_or("");
1346                let mut parts = request_line.split_whitespace();
1347                let method = parts.next().unwrap_or("").to_string();
1348                let path = parts.next().unwrap_or("").to_string();
1349                let mut headers = Vec::new();
1350                let mut content_length: usize = 0;
1351                for line in lines {
1352                    if let Some((k, v)) = line.split_once(':') {
1353                        let k = k.trim().to_string();
1354                        let v = v.trim().to_string();
1355                        if k.eq_ignore_ascii_case("content-length") {
1356                            content_length = v.parse().unwrap_or(0);
1357                        }
1358                        headers.push((k, v));
1359                    }
1360                }
1361                while data.len() < header_len + content_length {
1362                    let read = stream.read(&mut buf).unwrap_or(0);
1363                    if read == 0 {
1364                        break;
1365                    }
1366                    data.extend_from_slice(&buf[..read]);
1367                }
1368                let body = String::from_utf8_lossy(&data[header_len..header_len + content_length])
1369                    .to_string();
1370                return CapturedRequest {
1371                    method,
1372                    path,
1373                    headers,
1374                    body,
1375                };
1376            }
1377        }
1378        CapturedRequest {
1379            method: String::new(),
1380            path: String::new(),
1381            headers: Vec::new(),
1382            body: String::new(),
1383        }
1384    }
1385
1386    /// Spawn a one-shot HTTP server that replies with `(status, body)`
1387    /// to a single request, captures it, and returns `(base_url, captured)`.
1388    fn spawn_mock(
1389        status: u16,
1390        response_body: &'static str,
1391    ) -> (String, Arc<Mutex<Option<CapturedRequest>>>) {
1392        let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
1393        let addr = listener.local_addr().expect("addr");
1394        let captured: Arc<Mutex<Option<CapturedRequest>>> = Arc::new(Mutex::new(None));
1395        let captured_clone = Arc::clone(&captured);
1396        thread::spawn(move || {
1397            if let Ok((mut stream, _)) = listener.accept() {
1398                let req = parse_http_request(&mut stream);
1399                *captured_clone.lock().unwrap() = Some(req);
1400                let status_line = match status {
1401                    200 => "200 OK",
1402                    400 => "400 Bad Request",
1403                    401 => "401 Unauthorized",
1404                    500 => "500 Internal Server Error",
1405                    _ => "200 OK",
1406                };
1407                let resp = format!(
1408                    "HTTP/1.1 {status_line}\r\n\
1409                     Content-Type: application/json\r\n\
1410                     Content-Length: {}\r\n\
1411                     Connection: close\r\n\r\n{}",
1412                    response_body.len(),
1413                    response_body
1414                );
1415                let _ = stream.write_all(resp.as_bytes());
1416            }
1417        });
1418        (format!("http://{}", addr), captured)
1419    }
1420
1421    #[test]
1422    fn openai_compat_chat_roundtrip_honors_arbitrary_api_base_and_headers() {
1423        let body = r#"{
1424            "id":"chatcmpl_x",
1425            "model":"custom-model",
1426            "choices":[{"index":0,"finish_reason":"stop","message":{"role":"assistant","content":"hi"}}],
1427            "usage":{"prompt_tokens":7,"completion_tokens":2,"total_tokens":9}
1428        }"#;
1429        let (base, captured) = spawn_mock(200, body);
1430
1431        let req = OpenAiCompatChatRequest {
1432            api_base: base.clone(),
1433            api_key: "sk-test".to_string(),
1434            model: "custom-model".to_string(),
1435            prompt: "say hi".to_string(),
1436            temperature: None,
1437            seed: None,
1438            max_output_tokens: None,
1439            extra_headers: vec![("X-Custom-Tag".to_string(), "abc".to_string())],
1440        };
1441        let resp = openai_compat_chat(req).expect("ok");
1442
1443        assert_eq!(resp.output_text, "hi");
1444        assert_eq!(resp.model, "custom-model");
1445        assert_eq!(resp.usage.input_tokens, Some(7));
1446        assert_eq!(resp.usage.output_tokens, Some(2));
1447        assert_eq!(resp.usage.total_tokens, Some(9));
1448        assert_eq!(resp.stop_reason.as_deref(), Some("stop"));
1449
1450        let cap = captured.lock().unwrap().take().expect("captured");
1451        assert_eq!(cap.method, "POST");
1452        assert_eq!(cap.path, "/chat/completions");
1453        let has_auth = cap
1454            .headers
1455            .iter()
1456            .any(|(k, v)| k.eq_ignore_ascii_case("authorization") && v == "Bearer sk-test");
1457        assert!(has_auth, "Authorization header missing");
1458        let has_custom = cap
1459            .headers
1460            .iter()
1461            .any(|(k, v)| k.eq_ignore_ascii_case("x-custom-tag") && v == "abc");
1462        assert!(has_custom, "extra header missing");
1463        assert!(cap.body.contains("\"model\":\"custom-model\""));
1464    }
1465
1466    #[test]
1467    fn openai_compat_embeddings_roundtrip_with_dimensions() {
1468        let body = r#"{
1469            "object":"list",
1470            "model":"embed-model",
1471            "data":[{"object":"embedding","index":0,"embedding":[0.5,0.25]}],
1472            "usage":{"prompt_tokens":4,"total_tokens":4}
1473        }"#;
1474        let (base, captured) = spawn_mock(200, body);
1475
1476        let req = OpenAiCompatEmbeddingsRequest {
1477            api_base: base,
1478            api_key: "sk-emb".to_string(),
1479            model: "embed-model".to_string(),
1480            inputs: vec!["hello".to_string()],
1481            dimensions: Some(2),
1482            extra_headers: vec![],
1483        };
1484        let resp = openai_compat_embeddings(req).expect("ok");
1485
1486        assert_eq!(resp.embeddings.len(), 1);
1487        assert_eq!(resp.embeddings[0], vec![0.5_f32, 0.25_f32]);
1488        assert_eq!(resp.usage.total_tokens, Some(4));
1489        assert_eq!(resp.usage.input_tokens, Some(4));
1490
1491        let cap = captured.lock().unwrap().take().expect("captured");
1492        assert_eq!(cap.path, "/embeddings");
1493        assert!(cap.body.contains("\"dimensions\":2"));
1494    }
1495
1496    #[test]
1497    fn openai_compat_chat_non_2xx_returns_structured_error() {
1498        let body = r#"{"error":{"message":"bad api key","type":"invalid_request_error"}}"#;
1499        let (base, _captured) = spawn_mock(401, body);
1500
1501        let req = OpenAiCompatChatRequest {
1502            api_base: base,
1503            api_key: "bad".to_string(),
1504            model: "m".to_string(),
1505            prompt: "hi".to_string(),
1506            temperature: None,
1507            seed: None,
1508            max_output_tokens: None,
1509            extra_headers: vec![],
1510        };
1511        let err = openai_compat_chat(req).unwrap_err().to_string();
1512        assert!(err.contains("status 401"), "got: {err}");
1513        assert!(err.contains("bad api key"), "got: {err}");
1514    }
1515
1516    #[test]
1517    fn openai_compat_chat_rejects_empty_model_and_prompt() {
1518        let req = OpenAiCompatChatRequest {
1519            api_base: "http://localhost:1".to_string(),
1520            api_key: "k".to_string(),
1521            model: "  ".to_string(),
1522            prompt: "hi".to_string(),
1523            temperature: None,
1524            seed: None,
1525            max_output_tokens: None,
1526            extra_headers: vec![],
1527        };
1528        let err = openai_compat_chat(req).unwrap_err().to_string();
1529        assert!(err.contains("model cannot be empty"), "got: {err}");
1530
1531        let req = OpenAiCompatChatRequest {
1532            api_base: "http://localhost:1".to_string(),
1533            api_key: "k".to_string(),
1534            model: "m".to_string(),
1535            prompt: "  ".to_string(),
1536            temperature: None,
1537            seed: None,
1538            max_output_tokens: None,
1539            extra_headers: vec![],
1540        };
1541        let err = openai_compat_chat(req).unwrap_err().to_string();
1542        assert!(err.contains("prompt cannot be empty"), "got: {err}");
1543    }
1544
1545    #[test]
1546    fn parse_provider_mode_recognizes_all_three_tokens() {
1547        assert_eq!(
1548            parse_provider_mode("openai-compat"),
1549            Some(AiProviderMode::OpenAiCompat)
1550        );
1551        assert_eq!(
1552            parse_provider_mode("OPENAI_NATIVE"),
1553            Some(AiProviderMode::OpenAiNative)
1554        );
1555        assert_eq!(
1556            parse_provider_mode("anthropic-native"),
1557            Some(AiProviderMode::AnthropicNative)
1558        );
1559        assert_eq!(parse_provider_mode("groq"), None);
1560    }
1561
1562    #[test]
1563    fn resolve_provider_mode_reads_kv_key() {
1564        let kv = |key: &str| -> crate::RedDBResult<Option<String>> {
1565            if key == "red.config.ai.provider" {
1566                Ok(Some("anthropic-native".to_string()))
1567            } else {
1568                Ok(None)
1569            }
1570        };
1571        assert_eq!(
1572            resolve_provider_mode(&kv),
1573            Some(AiProviderMode::AnthropicNative)
1574        );
1575    }
1576
1577    #[test]
1578    fn resolve_default_provider_honors_mode_key() {
1579        let kv = |key: &str| -> crate::RedDBResult<Option<String>> {
1580            match key {
1581                "red.config.ai.provider" => Ok(Some("anthropic-native".to_string())),
1582                "red.config.ai.default.provider" => Ok(Some("groq".to_string())),
1583                _ => Ok(None),
1584            }
1585        };
1586        assert_eq!(resolve_default_provider(&kv), AiProvider::Anthropic);
1587    }
1588
1589    #[tokio::test]
1590    async fn anthropic_prompt_async_rejects_empty_api_key() {
1591        let transport = crate::runtime::ai::transport::AiTransport::new(Default::default());
1592        let request = AnthropicPromptRequest {
1593            api_key: "  ".to_string(),
1594            model: "claude-3-5-haiku-latest".to_string(),
1595            prompt: "hello".to_string(),
1596            temperature: None,
1597            max_output_tokens: None,
1598            api_base: "https://api.anthropic.com/v1".to_string(),
1599            anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(),
1600        };
1601        let err = anthropic_prompt_async(&transport, request)
1602            .await
1603            .unwrap_err();
1604        assert!(err.to_string().contains("API key cannot be empty"));
1605    }
1606}
1607
1608// ============================================================================
1609// Provider & Credential Resolution (shared between HTTP, gRPC, and runtime)
1610// ============================================================================
1611
1612/// AI provider identifier.
1613#[derive(Debug, Clone, PartialEq, Eq)]
1614pub enum AiProvider {
1615    OpenAi,
1616    Anthropic,
1617    Groq,
1618    OpenRouter,
1619    Together,
1620    Venice,
1621    Ollama,
1622    DeepSeek,
1623    MiniMax,
1624    HuggingFace,
1625    Local,
1626    Custom(String),
1627}
1628
1629impl AiProvider {
1630    pub fn token(&self) -> &str {
1631        match self {
1632            Self::OpenAi => "openai",
1633            Self::Anthropic => "anthropic",
1634            Self::Groq => "groq",
1635            Self::OpenRouter => "openrouter",
1636            Self::Together => "together",
1637            Self::Venice => "venice",
1638            Self::Ollama => "ollama",
1639            Self::DeepSeek => "deepseek",
1640            Self::MiniMax => "minimax",
1641            Self::HuggingFace => "huggingface",
1642            Self::Local => "local",
1643            Self::Custom(name) => name.as_str(),
1644        }
1645    }
1646
1647    pub fn default_prompt_model(&self) -> &str {
1648        match self {
1649            Self::OpenAi => DEFAULT_OPENAI_PROMPT_MODEL,
1650            Self::Anthropic => DEFAULT_ANTHROPIC_PROMPT_MODEL,
1651            Self::Groq => "llama-3.3-70b-versatile",
1652            Self::OpenRouter => "auto",
1653            Self::Together => "meta-llama/Meta-Llama-3-8B-Instruct",
1654            Self::Venice => "llama-3.3-70b",
1655            Self::Ollama => "llama3",
1656            Self::DeepSeek => "deepseek-chat",
1657            Self::MiniMax => "abab6.5s-chat",
1658            Self::HuggingFace => "mistralai/Mistral-7B-Instruct-v0.3",
1659            Self::Local => "sentence-transformers/all-MiniLM-L6-v2",
1660            Self::Custom(_) => DEFAULT_OPENAI_PROMPT_MODEL,
1661        }
1662    }
1663
1664    pub fn prompt_model_env_name(&self) -> String {
1665        format!("REDDB_{}_PROMPT_MODEL", self.token().to_ascii_uppercase())
1666    }
1667
1668    pub fn default_embedding_model(&self) -> &str {
1669        match self {
1670            Self::Ollama => "nomic-embed-text",
1671            Self::MiniMax => "embo-01",
1672            Self::HuggingFace | Self::Local => "sentence-transformers/all-MiniLM-L6-v2",
1673            _ => DEFAULT_OPENAI_EMBEDDING_MODEL,
1674        }
1675    }
1676
1677    pub fn default_api_base(&self) -> &str {
1678        match self {
1679            Self::OpenAi => DEFAULT_OPENAI_API_BASE,
1680            Self::Anthropic => DEFAULT_ANTHROPIC_API_BASE,
1681            Self::Groq => "https://api.groq.com/openai/v1",
1682            Self::OpenRouter => "https://openrouter.ai/api/v1",
1683            Self::Together => "https://api.together.xyz/v1",
1684            Self::Venice => "https://api.venice.ai/api/v1",
1685            Self::Ollama => "http://localhost:11434/v1",
1686            Self::DeepSeek => "https://api.deepseek.com/v1",
1687            Self::MiniMax => "https://api.minimax.chat/v1",
1688            Self::HuggingFace => "https://api-inference.huggingface.co",
1689            Self::Local => "local",
1690            Self::Custom(base) => base.as_str(),
1691        }
1692    }
1693
1694    pub fn api_base_env_name(&self) -> String {
1695        format!("REDDB_{}_API_BASE", self.token().to_ascii_uppercase())
1696    }
1697
1698    pub fn default_key_env_name(&self) -> String {
1699        format!("REDDB_{}_API_KEY", self.token().to_ascii_uppercase())
1700    }
1701
1702    pub fn alias_key_env_name(&self, alias: &str) -> String {
1703        let normalized = normalize_alias_token(alias);
1704        format!(
1705            "REDDB_{}_API_KEY_{normalized}",
1706            self.token().to_ascii_uppercase()
1707        )
1708    }
1709
1710    pub fn resolve_api_base(&self) -> String {
1711        if let Ok(value) = std::env::var(self.api_base_env_name()) {
1712            let value = value.trim().to_string();
1713            if !value.is_empty() {
1714                return value;
1715            }
1716        }
1717        self.default_api_base().to_string()
1718    }
1719
1720    /// Resolve API base URL checking KV store too (for custom base_url config).
1721    pub fn resolve_api_base_with_kv<F>(&self, alias: &str, kv_getter: &F) -> String
1722    where
1723        F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1724    {
1725        // 1. Env var
1726        if let Ok(value) = std::env::var(self.api_base_env_name()) {
1727            let value = value.trim().to_string();
1728            if !value.is_empty() {
1729                return value;
1730            }
1731        }
1732        // 2. KV store: {provider}/{alias}/base_url
1733        let kv_key = format!("red.config.ai.{}.{alias}.base_url", self.token());
1734        if let Ok(Some(value)) = kv_getter(&kv_key) {
1735            let value = value.trim().to_string();
1736            if !value.is_empty() {
1737                return value;
1738            }
1739        }
1740        self.default_api_base().to_string()
1741    }
1742
1743    /// Whether this provider uses the OpenAI-compatible API format.
1744    pub fn is_openai_compatible(&self) -> bool {
1745        matches!(
1746            self,
1747            Self::OpenAi
1748                | Self::Groq
1749                | Self::OpenRouter
1750                | Self::Together
1751                | Self::Venice
1752                | Self::Ollama
1753                | Self::DeepSeek
1754                | Self::MiniMax
1755                | Self::Custom(_)
1756        )
1757    }
1758
1759    /// Whether this provider requires an API key (Ollama/Local don't).
1760    pub fn requires_api_key(&self) -> bool {
1761        !matches!(self, Self::Ollama | Self::Local)
1762    }
1763}
1764
1765/// Parse a provider string into AiProvider.
1766pub fn parse_provider(name: &str) -> crate::RedDBResult<AiProvider> {
1767    match name.trim().to_ascii_lowercase().as_str() {
1768        "openai" => Ok(AiProvider::OpenAi),
1769        "anthropic" => Ok(AiProvider::Anthropic),
1770        "groq" => Ok(AiProvider::Groq),
1771        "openrouter" | "open_router" => Ok(AiProvider::OpenRouter),
1772        "together" => Ok(AiProvider::Together),
1773        "venice" => Ok(AiProvider::Venice),
1774        "ollama" => Ok(AiProvider::Ollama),
1775        "deepseek" | "deep_seek" => Ok(AiProvider::DeepSeek),
1776        "minimax" | "mini_max" => Ok(AiProvider::MiniMax),
1777        "huggingface" | "hf" => Ok(AiProvider::HuggingFace),
1778        "local" => Ok(AiProvider::Local),
1779        other => {
1780            // Treat as custom provider if it looks like a URL
1781            if other.starts_with("http://") || other.starts_with("https://") {
1782                Ok(AiProvider::Custom(other.to_string()))
1783            } else {
1784                Err(crate::RedDBError::Query(format!(
1785                    "unsupported AI provider '{other}'; expected: openai, anthropic, groq, \
1786                     openrouter, together, venice, ollama, deepseek, minimax, huggingface, local"
1787                )))
1788            }
1789        }
1790    }
1791}
1792
1793/// Resolve the default AI provider. Checks:
1794/// 1. `REDDB_AI_PROVIDER` env var
1795/// 2. `red_config` KV key `red.config.ai.default.provider`
1796/// 3. Falls back to OpenAI
1797pub fn resolve_default_provider<F>(kv_getter: &F) -> AiProvider
1798where
1799    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1800{
1801    // 0. New mode selector (red.config.ai.provider) takes precedence
1802    //    when explicitly set — it picks the wire-protocol family.
1803    if let Some(mode) = resolve_provider_mode(kv_getter) {
1804        return provider_mode_to_provider(mode);
1805    }
1806    // 1. Env var
1807    if let Ok(value) = std::env::var("REDDB_AI_PROVIDER") {
1808        let value = value.trim().to_string();
1809        if !value.is_empty() {
1810            if let Ok(provider) = parse_provider(&value) {
1811                return provider;
1812            }
1813        }
1814    }
1815    // 2. KV store
1816    if let Ok(Some(value)) = kv_getter("red.config.ai.default.provider") {
1817        let value = value.trim().to_string();
1818        if !value.is_empty() {
1819            if let Ok(provider) = parse_provider(&value) {
1820                return provider;
1821            }
1822        }
1823    }
1824    AiProvider::OpenAi
1825}
1826
1827/// Resolve the default AI model. Checks:
1828/// 1. `REDDB_AI_MODEL` env var
1829/// 2. `red_config` KV key `red.config.ai.default.model`
1830/// 3. Falls back to provider's default
1831pub fn resolve_default_model<F>(provider: &AiProvider, kv_getter: &F) -> String
1832where
1833    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1834{
1835    // 1. Env var
1836    if let Ok(value) = std::env::var("REDDB_AI_MODEL") {
1837        let value = value.trim().to_string();
1838        if !value.is_empty() {
1839            return value;
1840        }
1841    }
1842    // 2. Provider-specific env var
1843    if let Ok(value) = std::env::var(provider.prompt_model_env_name()) {
1844        let value = value.trim().to_string();
1845        if !value.is_empty() {
1846            return value;
1847        }
1848    }
1849    // 3. KV store
1850    if let Ok(Some(value)) = kv_getter("red.config.ai.default.model") {
1851        let value = value.trim().to_string();
1852        if !value.is_empty() {
1853            return value;
1854        }
1855    }
1856    provider.default_prompt_model().to_string()
1857}
1858
1859/// Resolve default provider + model from runtime KV store.
1860pub fn resolve_defaults_from_runtime(
1861    runtime: &crate::runtime::RedDBRuntime,
1862) -> (AiProvider, String) {
1863    use crate::application::ports::RuntimeEntityPort;
1864    let kv_getter = |key: &str| -> crate::RedDBResult<Option<String>> {
1865        match runtime.get_kv("red_config", key)? {
1866            Some((crate::storage::schema::Value::Text(s), _)) => Ok(Some(s.to_string())),
1867            _ => Ok(None),
1868        }
1869    };
1870    let provider = resolve_default_provider(&kv_getter);
1871    let model = resolve_default_model(&provider, &kv_getter);
1872    (provider, model)
1873}
1874
1875/// Resolve default provider + model via RuntimeEntityPort trait (for use in QueryUseCases).
1876pub fn resolve_defaults_from_runtime_port<
1877    P: crate::application::ports::RuntimeEntityPort + ?Sized,
1878>(
1879    runtime: &P,
1880) -> (AiProvider, String) {
1881    let kv_getter = |key: &str| -> crate::RedDBResult<Option<String>> {
1882        match runtime.get_kv("red_config", key)? {
1883            Some((crate::storage::schema::Value::Text(s), _)) => Ok(Some(s.to_string())),
1884            _ => Ok(None),
1885        }
1886    };
1887    let provider = resolve_default_provider(&kv_getter);
1888    let model = resolve_default_model(&provider, &kv_getter);
1889    (provider, model)
1890}
1891
1892/// Resolve an API key for a provider, **preferring the encrypted vault
1893/// over environment variables** (issue #1270). The env vars are a
1894/// bootstrap fallback so a fresh deployment can talk to a provider
1895/// before any key has been written to the vault.
1896///
1897/// Aliased lookup (`credential_alias = Some(..)`):
1898/// 1. Vault secret path: `red.secret.ai.<provider>.<alias>.api_key`
1899/// 2. Vault secret indirected via `red.config.ai.<provider>.<alias>.secret_ref`
1900/// 3. Env fallback with alias: `REDDB_<PROVIDER>_API_KEY_{ALIAS}`
1901/// 4. Legacy KV config: `red.config.ai.<provider>.<alias>.key`
1902///
1903/// Default lookup (`credential_alias = None`):
1904/// 1. Vault secret path: `red.secret.ai.<provider>.default.api_key`
1905/// 2. Vault secret indirected via `secret_ref`
1906/// 3. Env fallback: `REDDB_<PROVIDER>_API_KEY`
1907/// 4. Legacy KV config (`red.config.ai.<provider>.default.key` and the
1908///    short `<provider>/default` form)
1909///
1910/// `kv_getter` receives either a `red.secret.*` path (routed to the
1911/// encrypted vault by [`resolve_api_key_from_runtime`]) or a legacy
1912/// `red.config.*` key and returns the value if found. Vault-stored keys
1913/// are therefore encrypted at rest and rotatable via the existing vault
1914/// KV path; the env vars carry no such guarantees, which is why they are
1915/// the fallback rather than the primary source.
1916pub fn resolve_api_key<F>(
1917    provider: &AiProvider,
1918    credential_alias: Option<&str>,
1919    kv_getter: F,
1920) -> crate::RedDBResult<String>
1921where
1922    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
1923{
1924    // Providers that don't require API keys
1925    if !provider.requires_api_key() {
1926        // Still try to find a key (user may have one for auth'd Ollama)
1927        if let Ok(value) = std::env::var(provider.default_key_env_name()) {
1928            let value = value.trim().to_string();
1929            if !value.is_empty() {
1930                return Ok(value);
1931            }
1932        }
1933        return Ok(String::new());
1934    }
1935
1936    if let Some(alias) = credential_alias.map(str::trim).filter(|a| !a.is_empty()) {
1937        // 1. Vault secret path (managed, encrypted at rest).
1938        if let Some(key) = kv_getter(&ai_api_secret_path(provider, alias))? {
1939            if !key.trim().is_empty() {
1940                return Ok(key);
1941            }
1942        }
1943        // 2. Vault secret reachable through a configured indirection ref.
1944        if let Some(secret_ref) = kv_getter(&ai_api_secret_ref_config_key(provider, alias))? {
1945            if let Some(key) = kv_getter(secret_ref.trim())? {
1946                if !key.trim().is_empty() {
1947                    return Ok(key);
1948                }
1949            }
1950        }
1951        // 3. Env fallback with alias (bootstrap before vault is populated).
1952        if let Some(key) = resolve_key_from_env_alias(provider, alias) {
1953            return Ok(key);
1954        }
1955        let legacy_key = ai_api_legacy_config_key(provider, alias);
1956        if let Some(key) = kv_getter(&legacy_key)? {
1957            if !key.trim().is_empty() {
1958                return Ok(key);
1959            }
1960        }
1961        return Err(crate::RedDBError::Query(format!(
1962            "credential '{alias}' not found for {}. Set env {} or store it in the vault",
1963            provider.token(),
1964            provider.alias_key_env_name(alias)
1965        )));
1966    }
1967
1968    // 1. Vault secret path (managed, encrypted at rest).
1969    if let Some(key) = kv_getter(&ai_api_secret_path(provider, "default"))? {
1970        if !key.trim().is_empty() {
1971            return Ok(key);
1972        }
1973    }
1974    // 2. Vault secret reachable through a configured indirection ref.
1975    if let Some(secret_ref) = kv_getter(&ai_api_secret_ref_config_key(provider, "default"))? {
1976        if let Some(key) = kv_getter(secret_ref.trim())? {
1977            if !key.trim().is_empty() {
1978                return Ok(key);
1979            }
1980        }
1981    }
1982
1983    // 3. Env fallback (bootstrap before the vault is populated).
1984    if let Ok(value) = std::env::var(provider.default_key_env_name()) {
1985        let value = value.trim().to_string();
1986        if !value.is_empty() {
1987            return Ok(value);
1988        }
1989    }
1990
1991    if let Some(key) = kv_getter(&ai_api_legacy_config_key(provider, "default"))? {
1992        if !key.trim().is_empty() {
1993            return Ok(key);
1994        }
1995    }
1996
1997    let legacy_short_key = format!("{}/default", provider.token());
1998    if let Some(key) = kv_getter(&legacy_short_key)? {
1999        if !key.trim().is_empty() {
2000            return Ok(key);
2001        }
2002    }
2003
2004    Err(crate::RedDBError::Query(format!(
2005        "missing {} API key. Set {} or provide credential alias",
2006        provider.token(),
2007        provider.default_key_env_name()
2008    )))
2009}
2010
2011pub fn ai_api_secret_path(provider: &AiProvider, alias: &str) -> String {
2012    format!(
2013        "red.secret.ai.{}.{}.api_key",
2014        provider.token(),
2015        normalize_credential_alias_path(alias)
2016    )
2017}
2018
2019pub fn ai_api_secret_ref_config_key(provider: &AiProvider, alias: &str) -> String {
2020    format!(
2021        "red.config.ai.{}.{}.secret_ref",
2022        provider.token(),
2023        normalize_credential_alias_path(alias)
2024    )
2025}
2026
2027pub fn ai_api_legacy_config_key(provider: &AiProvider, alias: &str) -> String {
2028    format!(
2029        "red.config.ai.{}.{}.key",
2030        provider.token(),
2031        normalize_credential_alias_path(alias)
2032    )
2033}
2034
2035fn normalize_credential_alias_path(alias: &str) -> String {
2036    let alias = alias.trim();
2037    if alias.is_empty() {
2038        "default".to_string()
2039    } else {
2040        alias.to_ascii_lowercase()
2041    }
2042}
2043
2044fn resolve_key_from_env_alias(provider: &AiProvider, alias: &str) -> Option<String> {
2045    let env_name = provider.alias_key_env_name(alias);
2046    std::env::var(env_name)
2047        .ok()
2048        .map(|v| v.trim().to_string())
2049        .filter(|v| !v.is_empty())
2050}
2051
2052fn normalize_alias_token(alias: &str) -> String {
2053    let mut out = String::with_capacity(alias.len());
2054    for character in alias.chars() {
2055        if character.is_ascii_alphanumeric() {
2056            out.push(character.to_ascii_uppercase());
2057        } else {
2058            out.push('_');
2059        }
2060    }
2061    while out.contains("__") {
2062        out = out.replace("__", "_");
2063    }
2064    out.trim_matches('_').to_string()
2065}
2066
2067/// Convenience: resolve API key using a RedDBRuntime's KV store.
2068///
2069/// Emits an `ai.credential.resolve` audit event so operators can answer
2070/// "which principal caused us to read `red.secret.ai.<provider>.*`?"
2071/// even though the read itself is performed as system (the AI subsystem
2072/// must always be able to fetch the key the query needs — denying it
2073/// would be denying the query at the wrong layer). The audit record
2074/// never contains the secret value.
2075pub fn resolve_api_key_from_runtime(
2076    provider: &AiProvider,
2077    credential_alias: Option<&str>,
2078    runtime: &crate::runtime::RedDBRuntime,
2079) -> crate::RedDBResult<String> {
2080    use crate::application::ports::RuntimeEntityPort;
2081    let alias_for_audit = credential_alias.unwrap_or("default").to_string();
2082    let provider_token = provider.token().to_string();
2083    let audited_paths: std::cell::RefCell<Vec<(String, bool)>> =
2084        std::cell::RefCell::new(Vec::new());
2085    let result = resolve_api_key(provider, credential_alias, |kv_key| {
2086        if kv_key.starts_with("red.secret.") {
2087            let value = runtime.vault_kv_get(kv_key);
2088            audited_paths
2089                .borrow_mut()
2090                .push((kv_key.to_string(), value.is_some()));
2091            return Ok(value);
2092        }
2093        match runtime.get_kv("red_config", kv_key)? {
2094            Some((crate::storage::schema::Value::Text(secret), _)) => {
2095                audited_paths.borrow_mut().push((kv_key.to_string(), true));
2096                Ok(Some(secret.to_string()))
2097            }
2098            Some(_) => {
2099                audited_paths.borrow_mut().push((kv_key.to_string(), false));
2100                Ok(None)
2101            }
2102            None => {
2103                audited_paths.borrow_mut().push((kv_key.to_string(), false));
2104                Ok(None)
2105            }
2106        }
2107    });
2108    let audited_paths = audited_paths.into_inner();
2109
2110    let principal = crate::runtime::impl_core::current_auth_identity_for_audit()
2111        .map(|(user, _role)| user)
2112        .unwrap_or_else(|| "system".to_string());
2113    let outcome = if result.is_ok() { "hit" } else { "miss" };
2114    let target = format!("ai.credential:{provider_token}/{alias_for_audit}");
2115    let paths_json: Vec<crate::serde_json::Value> = audited_paths
2116        .iter()
2117        .map(|(p, hit)| {
2118            crate::serde_json::json!({
2119                "path": p,
2120                "hit": hit,
2121            })
2122        })
2123        .collect();
2124    let details = crate::serde_json::json!({
2125        "provider": provider_token,
2126        "alias": alias_for_audit,
2127        "paths_checked": paths_json,
2128    });
2129    runtime.audit_log().record(
2130        "ai.credential.resolve",
2131        &principal,
2132        &target,
2133        outcome,
2134        details,
2135    );
2136    result
2137}
2138
2139// ============================================================================
2140// HuggingFace Inference API
2141// ============================================================================
2142
2143/// Generate embeddings via HuggingFace Inference API.
2144pub fn huggingface_embeddings(
2145    api_key: &str,
2146    model: &str,
2147    inputs: &[String],
2148    api_base: &str,
2149) -> crate::RedDBResult<OpenAiEmbeddingResponse> {
2150    let url = format!("{api_base}/pipeline/feature-extraction/{model}");
2151    let mut embeddings = Vec::with_capacity(inputs.len());
2152
2153    for input in inputs {
2154        let payload = crate::serde_json::json!({ "inputs": input }).to_string_compact();
2155        let (status, body_str) = http_post_json(&url, api_key, &[], payload, 90)
2156            .map_err(|e| crate::RedDBError::Query(format!("HuggingFace API error: {e}")))?;
2157        if !(200..300).contains(&status) {
2158            return Err(crate::RedDBError::Query(format!(
2159                "HuggingFace API error (status {status}): {body_str}"
2160            )));
2161        }
2162        let body: JsonValue = crate::serde_json::from_str(&body_str).map_err(|e| {
2163            crate::RedDBError::Query(format!("HuggingFace response parse error: {e}"))
2164        })?;
2165
2166        // HF returns [[f32, ...]] for single input
2167        let vector: Vec<f32> = match &body {
2168            JsonValue::Array(outer) => outer
2169                .iter()
2170                .filter_map(|v| v.as_f64().map(|n| n as f32))
2171                .collect(),
2172            _ => {
2173                return Err(crate::RedDBError::Query(
2174                    "unexpected HuggingFace embedding response format".to_string(),
2175                ))
2176            }
2177        };
2178        embeddings.push(vector);
2179    }
2180
2181    Ok(OpenAiEmbeddingResponse {
2182        provider: "huggingface",
2183        model: model.to_string(),
2184        embeddings,
2185        prompt_tokens: None,
2186        total_tokens: None,
2187    })
2188}
2189
2190/// Generate text via HuggingFace Inference API.
2191pub fn huggingface_prompt(
2192    api_key: &str,
2193    model: &str,
2194    prompt: &str,
2195    temperature: Option<f32>,
2196    max_tokens: Option<usize>,
2197    api_base: &str,
2198) -> crate::RedDBResult<AiPromptResponse> {
2199    let url = format!("{api_base}/models/{model}");
2200    let mut params = Map::new();
2201    if let Some(t) = temperature {
2202        params.insert("temperature".into(), JsonValue::Number(t as f64));
2203    }
2204    params.insert(
2205        "max_new_tokens".into(),
2206        JsonValue::Number(max_tokens.unwrap_or(512) as f64),
2207    );
2208    let payload = crate::serde_json::json!({
2209        "inputs": prompt,
2210        "parameters": JsonValue::Object(params)
2211    });
2212
2213    let (status, body_str) =
2214        http_post_json(&url, api_key, &[], payload.to_string_compact(), 120)
2215            .map_err(|e| crate::RedDBError::Query(format!("HuggingFace API error: {e}")))?;
2216    if !(200..300).contains(&status) {
2217        return Err(crate::RedDBError::Query(format!(
2218            "HuggingFace API error (status {status}): {body_str}"
2219        )));
2220    }
2221    let body: JsonValue = crate::serde_json::from_str(&body_str)
2222        .map_err(|e| crate::RedDBError::Query(format!("HuggingFace response parse error: {e}")))?;
2223
2224    let output_text = match &body {
2225        JsonValue::Array(arr) => arr
2226            .first()
2227            .and_then(|v| v.get("generated_text"))
2228            .and_then(JsonValue::as_str)
2229            .unwrap_or("")
2230            .to_string(),
2231        _ => body
2232            .get("generated_text")
2233            .and_then(JsonValue::as_str)
2234            .unwrap_or("")
2235            .to_string(),
2236    };
2237
2238    Ok(AiPromptResponse {
2239        provider: "huggingface",
2240        model: model.to_string(),
2241        output_text,
2242        output_chunks: None,
2243        prompt_tokens: None,
2244        completion_tokens: None,
2245        total_tokens: None,
2246        stop_reason: None,
2247    })
2248}
2249
2250// ============================================================================
2251// Local model stubs (requires 'local-models' feature flag)
2252// ============================================================================
2253
2254const LOCAL_MODELS_DISABLED_MESSAGE: &str = "local embeddings require the `local-models` feature \
2255flag at engine build time. Build with: cargo build --features local-models. Alternatively, use \
2256the 'ollama' provider with a local Ollama server.";
2257
2258const LOCAL_EMBEDDINGS_NOT_IMPLEMENTED_MESSAGE: &str = "local embeddings are registered by the \
2259`local-models` feature, but local model artifact execution is not implemented in this slice. \
2260Alternatively, use the 'ollama' provider with a local Ollama server.";
2261
2262const LOCAL_PROMPT_OUT_OF_SCOPE_MESSAGE: &str = "local prompt and generation are out of scope for \
2263the `local-models` feature; the local provider contract is embeddings-only for this slice.";
2264
2265pub fn local_embeddings_unavailable_error() -> crate::RedDBError {
2266    if cfg!(feature = "local-models") {
2267        crate::RedDBError::Query(LOCAL_EMBEDDINGS_NOT_IMPLEMENTED_MESSAGE.to_string())
2268    } else {
2269        crate::RedDBError::FeatureNotEnabled(LOCAL_MODELS_DISABLED_MESSAGE.to_string())
2270    }
2271}
2272
2273pub fn local_prompt_unavailable_error() -> crate::RedDBError {
2274    crate::RedDBError::Query(LOCAL_PROMPT_OUT_OF_SCOPE_MESSAGE.to_string())
2275}
2276
2277/// Local embedding via candle — requires `local-models` feature.
2278pub fn local_embeddings(
2279    _model_id: &str,
2280    _texts: &[String],
2281) -> crate::RedDBResult<OpenAiEmbeddingResponse> {
2282    Err(local_embeddings_unavailable_error())
2283}
2284
2285/// Local prompt via candle — requires `local-models` feature.
2286pub fn local_prompt(_model_id: &str, _prompt: &str) -> crate::RedDBResult<AiPromptResponse> {
2287    Err(local_prompt_unavailable_error())
2288}
2289
2290// ============================================================================
2291// gRPC input collection — parity with HTTP /ai/embeddings
2292// ============================================================================
2293
2294/// Collect embedding inputs from any of the three supported shapes.
2295///
2296/// * `input: "..."` — single string.
2297/// * `inputs: ["...", ...]` — array of strings.
2298/// * `source_query: "SELECT ..."` — runs a SQL query and projects
2299///   either the named `source_field` from each row (source_mode =
2300///   "row", default) or every string cell of every result row
2301///   (source_mode = "result").
2302fn grpc_collect_embedding_inputs(
2303    runtime: &crate::runtime::RedDBRuntime,
2304    payload: &JsonValue,
2305) -> crate::RedDBResult<Vec<String>> {
2306    if let Some(source_query) = payload
2307        .get("source_query")
2308        .and_then(|v| v.as_str())
2309        .map(str::trim)
2310        .filter(|s| !s.is_empty())
2311    {
2312        return grpc_collect_inputs_from_source_query(runtime, payload, source_query);
2313    }
2314
2315    if let Some(arr) = payload.get("inputs").and_then(|v| v.as_array()) {
2316        let mut out = Vec::with_capacity(arr.len());
2317        for (idx, v) in arr.iter().enumerate() {
2318            let text = v.as_str().ok_or_else(|| {
2319                crate::RedDBError::Query(format!("field 'inputs[{idx}]' must be a string"))
2320            })?;
2321            if text.trim().is_empty() {
2322                return Err(crate::RedDBError::Query(format!(
2323                    "field 'inputs[{idx}]' cannot be empty"
2324                )));
2325            }
2326            out.push(text.to_string());
2327        }
2328        if out.is_empty() {
2329            return Err(crate::RedDBError::Query(
2330                "field 'inputs' must be a non-empty array of strings".to_string(),
2331            ));
2332        }
2333        return Ok(out);
2334    }
2335
2336    if let Some(single) = payload
2337        .get("input")
2338        .and_then(|v| v.as_str())
2339        .map(str::trim)
2340        .filter(|s| !s.is_empty())
2341    {
2342        return Ok(vec![single.to_string()]);
2343    }
2344
2345    Err(crate::RedDBError::Query(
2346        "provide either 'input', 'inputs', or 'source_query'".to_string(),
2347    ))
2348}
2349
2350fn grpc_collect_inputs_from_source_query(
2351    runtime: &crate::runtime::RedDBRuntime,
2352    payload: &JsonValue,
2353    source_query: &str,
2354) -> crate::RedDBResult<Vec<String>> {
2355    let result = runtime
2356        .execute_query(source_query)
2357        .map_err(|err| crate::RedDBError::Query(format!("source_query failed: {err}")))?;
2358
2359    let source_mode = payload
2360        .get("source_mode")
2361        .and_then(|v| v.as_str())
2362        .map(str::trim)
2363        .filter(|s| !s.is_empty())
2364        .unwrap_or("row")
2365        .to_ascii_lowercase();
2366
2367    let mut out: Vec<String> = Vec::new();
2368    match source_mode.as_str() {
2369        "row" => {
2370            let field = payload
2371                .get("source_field")
2372                .and_then(|v| v.as_str())
2373                .map(str::trim)
2374                .filter(|s| !s.is_empty())
2375                .ok_or_else(|| {
2376                    crate::RedDBError::Query(
2377                        "field 'source_field' is required when source_mode='row'".to_string(),
2378                    )
2379                })?;
2380            for rec in &result.result.records {
2381                for (key, value) in rec.iter_fields() {
2382                    if key.as_ref() == field {
2383                        if let crate::storage::schema::Value::Text(text) = value {
2384                            let trimmed = text.trim();
2385                            if !trimmed.is_empty() {
2386                                out.push(trimmed.to_string());
2387                            }
2388                        }
2389                    }
2390                }
2391            }
2392        }
2393        "result" => {
2394            for rec in &result.result.records {
2395                for (_, value) in rec.iter_fields() {
2396                    if let crate::storage::schema::Value::Text(text) = value {
2397                        let trimmed = text.trim();
2398                        if !trimmed.is_empty() {
2399                            out.push(trimmed.to_string());
2400                        }
2401                    }
2402                }
2403            }
2404        }
2405        other => {
2406            return Err(crate::RedDBError::Query(format!(
2407                "field 'source_mode' must be 'row' or 'result' (got '{other}')"
2408            )));
2409        }
2410    }
2411
2412    if out.is_empty() {
2413        return Err(crate::RedDBError::Query(
2414            "source_query produced zero non-empty text inputs".to_string(),
2415        ));
2416    }
2417    Ok(out)
2418}
2419
2420// ============================================================================
2421// gRPC stubs — delegate to the same logic as HTTP handlers
2422// ============================================================================
2423
2424/// gRPC embeddings — shared entrypoint that mirrors the HTTP handler.
2425///
2426/// Accepts the same JSON payload shape as `POST /ai/embeddings`:
2427///
2428/// ```json
2429/// { "provider": "openai", "model": "text-embedding-3-small",
2430///   "inputs": ["hello", "world"], "credential": "optional-alias" }
2431/// ```
2432///
2433/// Input shapes at parity with HTTP: `input` (single string),
2434/// `inputs` (array of strings), and `source_query` (SQL that the
2435/// runtime executes to materialise the input texts; `source_mode`
2436/// = `row` needs `source_collection` + `source_field`, `result`
2437/// uses the projected columns). Returns a JSON object with
2438/// `provider`, `model`, `embeddings`, `prompt_tokens`,
2439/// `total_tokens`. Non-OpenAI-compatible providers are rejected
2440/// with a clear message, matching the HTTP handler.
2441pub fn grpc_embeddings(
2442    runtime: &crate::runtime::RedDBRuntime,
2443    payload: &JsonValue,
2444) -> crate::RedDBResult<JsonValue> {
2445    let provider_name = payload
2446        .get("provider")
2447        .and_then(|v| v.as_str())
2448        .map(str::trim)
2449        .filter(|s| !s.is_empty())
2450        .unwrap_or("openai");
2451    let provider = parse_provider(provider_name)?;
2452    // Routing matrix mirrors `handle_ai_embeddings`. See that function
2453    // for the rationale; in short: HuggingFace gets its own wire
2454    // shape, Anthropic fails fast (no embeddings product), and Local
2455    // requires a build-time feature flag.
2456    match &provider {
2457        AiProvider::Anthropic => {
2458            return Err(crate::RedDBError::Query(
2459                "Anthropic does not offer an embeddings API. \
2460                 Re-issue the request against an OpenAI-compatible \
2461                 provider (openai, groq, ollama, openrouter, together, \
2462                 venice, deepseek), HuggingFace, or a custom base URL — \
2463                 RedDB does not silently route embeddings to a \
2464                 different provider than the one you named."
2465                    .to_string(),
2466            ));
2467        }
2468        AiProvider::Local => {
2469            return grpc_embeddings_local(runtime, payload);
2470        }
2471        _ => {}
2472    }
2473
2474    let inputs: Vec<String> = grpc_collect_embedding_inputs(runtime, payload)?;
2475
2476    let model = payload
2477        .get("model")
2478        .and_then(|v| v.as_str())
2479        .map(str::trim)
2480        .filter(|s| !s.is_empty())
2481        .map(str::to_string)
2482        .or_else(|| {
2483            std::env::var(format!(
2484                "REDDB_{}_EMBEDDING_MODEL",
2485                provider.token().to_ascii_uppercase()
2486            ))
2487            .ok()
2488        })
2489        .or_else(|| std::env::var("REDDB_OPENAI_EMBEDDING_MODEL").ok())
2490        .filter(|v| !v.trim().is_empty())
2491        .unwrap_or_else(|| provider.default_embedding_model().to_string());
2492
2493    let credential = payload
2494        .get("credential")
2495        .and_then(|v| v.as_str())
2496        .map(str::to_string);
2497    let api_key = resolve_api_key_from_runtime(&provider, credential.as_deref(), runtime)?;
2498
2499    let dimensions = payload
2500        .get("dimensions")
2501        .and_then(|v| v.as_i64())
2502        .and_then(|v| usize::try_from(v).ok())
2503        .filter(|v| *v > 0);
2504
2505    let response = match &provider {
2506        AiProvider::HuggingFace => {
2507            huggingface_embeddings(&api_key, &model, &inputs, &provider.resolve_api_base())?
2508        }
2509        _ => {
2510            let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
2511            let request = OpenAiEmbeddingRequest {
2512                api_key,
2513                model,
2514                inputs,
2515                dimensions,
2516                api_base: provider.resolve_api_base(),
2517            };
2518            crate::runtime::ai::block_on_ai(async move {
2519                openai_embeddings_async(&transport, request).await
2520            })
2521            .and_then(|result| result)?
2522        }
2523    };
2524
2525    let embeddings_json: Vec<JsonValue> = response
2526        .embeddings
2527        .into_iter()
2528        .map(|vec| {
2529            JsonValue::Array(
2530                vec.into_iter()
2531                    .map(|f| JsonValue::Number(f as f64))
2532                    .collect(),
2533            )
2534        })
2535        .collect();
2536
2537    let mut obj = Map::new();
2538    obj.insert(
2539        "provider".to_string(),
2540        JsonValue::String(response.provider.to_string()),
2541    );
2542    obj.insert("model".to_string(), JsonValue::String(response.model));
2543    obj.insert("embeddings".to_string(), JsonValue::Array(embeddings_json));
2544    if let Some(pt) = response.prompt_tokens {
2545        obj.insert("prompt_tokens".to_string(), JsonValue::Number(pt as f64));
2546    }
2547    if let Some(tt) = response.total_tokens {
2548        obj.insert("total_tokens".to_string(), JsonValue::Number(tt as f64));
2549    }
2550    Ok(JsonValue::Object(obj))
2551}
2552
2553/// gRPC local-provider embedding path (#680).
2554///
2555/// Mirrors the HTTP local path: resolves a registered+installed local
2556/// model, runs the runtime backend, and returns the same JSON shape
2557/// the HTTP handler produces (`provider`, `model`, `model_source`,
2558/// `model_revision`, `model_engine`, `dimensions`, `embeddings`).
2559/// Save-side behaviour is HTTP-only; gRPC mirrors the OpenAI-compatible
2560/// gRPC path which also does not persist.
2561fn grpc_embeddings_local(
2562    runtime: &crate::runtime::RedDBRuntime,
2563    payload: &JsonValue,
2564) -> crate::RedDBResult<JsonValue> {
2565    crate::runtime::ai::local_embedding::ensure_local_embedding_available()?;
2566
2567    let model_name = payload
2568        .get("model")
2569        .and_then(|v| v.as_str())
2570        .map(str::trim)
2571        .filter(|s| !s.is_empty())
2572        .ok_or_else(|| {
2573            crate::RedDBError::Query(
2574                "field 'model' is required for the local provider and must be the \
2575                 registered local model name (see POST /ai/models)"
2576                    .to_string(),
2577            )
2578        })?
2579        .to_string();
2580
2581    let inputs = grpc_collect_embedding_inputs(runtime, payload)?;
2582    let response = crate::runtime::ai::local_embedding::embed_local(runtime, &model_name, inputs)?;
2583
2584    let embeddings_json: Vec<JsonValue> = response
2585        .embeddings
2586        .into_iter()
2587        .map(|vec| {
2588            JsonValue::Array(
2589                vec.into_iter()
2590                    .map(|f| JsonValue::Number(f as f64))
2591                    .collect(),
2592            )
2593        })
2594        .collect();
2595
2596    let mut obj = Map::new();
2597    obj.insert(
2598        "provider".to_string(),
2599        JsonValue::String(response.provider.to_string()),
2600    );
2601    obj.insert("model".to_string(), JsonValue::String(response.name));
2602    obj.insert(
2603        "model_source".to_string(),
2604        JsonValue::String(response.source),
2605    );
2606    obj.insert(
2607        "model_revision".to_string(),
2608        JsonValue::String(response.revision),
2609    );
2610    obj.insert(
2611        "model_engine".to_string(),
2612        JsonValue::String(response.engine),
2613    );
2614    obj.insert(
2615        "dimensions".to_string(),
2616        JsonValue::Number(response.dimensions as f64),
2617    );
2618    obj.insert("embeddings".to_string(), JsonValue::Array(embeddings_json));
2619    Ok(JsonValue::Object(obj))
2620}
2621
2622/// gRPC stub for AI prompt.
2623pub fn grpc_prompt(
2624    _runtime: &crate::runtime::RedDBRuntime,
2625    _payload: &JsonValue,
2626) -> crate::RedDBResult<JsonValue> {
2627    Err(crate::RedDBError::FeatureNotEnabled(
2628        "AI prompt via gRPC requires HTTP endpoint; use POST /ai/prompt".to_string(),
2629    ))
2630}
2631
2632/// gRPC stub for AI credentials.
2633pub fn grpc_credentials(
2634    _runtime: &crate::runtime::RedDBRuntime,
2635    _payload: &JsonValue,
2636) -> crate::RedDBResult<JsonValue> {
2637    Err(crate::RedDBError::FeatureNotEnabled(
2638        "AI credentials via gRPC requires HTTP endpoint; use POST /ai/credentials".to_string(),
2639    ))
2640}
2641
2642// ============================================================================
2643// Generic OpenAI-compatible client (issue gh-516)
2644//
2645// Thin blocking client that targets any `{api_base}/chat/completions`
2646// and `{api_base}/embeddings` endpoint with arbitrary auth headers.
2647// Existing vendor-native paths (`openai_prompt_async`,
2648// `anthropic_prompt_async`) remain unchanged; this exists so callers
2649// can talk to non-OpenAI providers that expose an OpenAI-compatible
2650// surface (Groq, OpenRouter, Together, Ollama, vLLM, LM Studio, ...)
2651// without having to register a new `AiProvider` variant.
2652// ============================================================================
2653
2654/// Normalized usage block. Field names follow the Anthropic shape
2655/// (`input_tokens` / `output_tokens`) so downstream cost-accounting
2656/// has one canonical schema regardless of the upstream provider.
2657#[derive(Debug, Clone, Default, PartialEq, Eq)]
2658pub struct OpenAiCompatUsage {
2659    pub input_tokens: Option<u64>,
2660    pub output_tokens: Option<u64>,
2661    pub total_tokens: Option<u64>,
2662}
2663
2664#[derive(Debug, Clone)]
2665pub struct OpenAiCompatChatRequest {
2666    pub api_base: String,
2667    pub api_key: String,
2668    pub model: String,
2669    pub prompt: String,
2670    pub temperature: Option<f32>,
2671    pub seed: Option<u64>,
2672    pub max_output_tokens: Option<usize>,
2673    pub extra_headers: Vec<(String, String)>,
2674}
2675
2676#[derive(Debug, Clone)]
2677pub struct OpenAiCompatChatResponse {
2678    pub model: String,
2679    pub output_text: String,
2680    pub stop_reason: Option<String>,
2681    pub usage: OpenAiCompatUsage,
2682}
2683
2684#[derive(Debug, Clone)]
2685pub struct OpenAiCompatEmbeddingsRequest {
2686    pub api_base: String,
2687    pub api_key: String,
2688    pub model: String,
2689    pub inputs: Vec<String>,
2690    pub dimensions: Option<usize>,
2691    pub extra_headers: Vec<(String, String)>,
2692}
2693
2694#[derive(Debug, Clone)]
2695pub struct OpenAiCompatEmbeddingsResponse {
2696    pub model: String,
2697    pub embeddings: Vec<Vec<f32>>,
2698    pub usage: OpenAiCompatUsage,
2699}
2700
2701fn extra_header_refs(headers: &[(String, String)]) -> Vec<(&str, &str)> {
2702    headers
2703        .iter()
2704        .map(|(k, v)| (k.as_str(), v.as_str()))
2705        .collect()
2706}
2707
2708/// POST `{api_base}/chat/completions` and return a normalized response.
2709///
2710/// Errors:
2711/// * empty model / prompt → `RedDBError::Query`.
2712/// * transport / non-2xx → `RedDBError::Query` carrying the status code
2713///   and the provider's parsed `error.message` when available, raw body
2714///   otherwise.
2715pub fn openai_compat_chat(
2716    request: OpenAiCompatChatRequest,
2717) -> RedDBResult<OpenAiCompatChatResponse> {
2718    if request.model.trim().is_empty() {
2719        return Err(RedDBError::Query(
2720            "openai-compat: model cannot be empty".to_string(),
2721        ));
2722    }
2723    if request.prompt.trim().is_empty() {
2724        return Err(RedDBError::Query(
2725            "openai-compat: prompt cannot be empty".to_string(),
2726        ));
2727    }
2728
2729    let url = format!(
2730        "{}/chat/completions",
2731        request.api_base.trim_end_matches('/')
2732    );
2733    let payload = build_openai_prompt_payload(
2734        &request.model,
2735        &request.prompt,
2736        request.temperature,
2737        request.seed,
2738        request.max_output_tokens,
2739        false,
2740    );
2741
2742    let extra = extra_header_refs(&request.extra_headers);
2743    let (status, body) = http_post_json(&url, &request.api_key, &extra, payload, 120)
2744        .map_err(|err| RedDBError::Query(format!("openai-compat transport error: {err}")))?;
2745
2746    if !(200..300).contains(&status) {
2747        let message = openai_error_message(&body).unwrap_or_else(|| {
2748            if body.trim().is_empty() {
2749                "openai-compat chat request failed".to_string()
2750            } else {
2751                body.clone()
2752            }
2753        });
2754        return Err(RedDBError::Query(format!(
2755            "openai-compat chat request failed (status {status}): {message}"
2756        )));
2757    }
2758
2759    let parsed = parse_openai_prompt_response(&body, &request.model)?;
2760    Ok(OpenAiCompatChatResponse {
2761        model: parsed.model,
2762        output_text: parsed.output_text,
2763        stop_reason: parsed.stop_reason,
2764        usage: OpenAiCompatUsage {
2765            input_tokens: parsed.prompt_tokens,
2766            output_tokens: parsed.completion_tokens,
2767            total_tokens: parsed.total_tokens,
2768        },
2769    })
2770}
2771
2772/// POST `{api_base}/embeddings` and return a normalized response.
2773pub fn openai_compat_embeddings(
2774    request: OpenAiCompatEmbeddingsRequest,
2775) -> RedDBResult<OpenAiCompatEmbeddingsResponse> {
2776    if request.model.trim().is_empty() {
2777        return Err(RedDBError::Query(
2778            "openai-compat: embedding model cannot be empty".to_string(),
2779        ));
2780    }
2781    if request.inputs.is_empty() {
2782        return Err(RedDBError::Query(
2783            "openai-compat: at least one input is required".to_string(),
2784        ));
2785    }
2786
2787    let url = format!("{}/embeddings", request.api_base.trim_end_matches('/'));
2788    let payload =
2789        build_openai_embedding_payload(&request.model, &request.inputs, request.dimensions);
2790
2791    let extra = extra_header_refs(&request.extra_headers);
2792    let (status, body) = http_post_json(&url, &request.api_key, &extra, payload, 90)
2793        .map_err(|err| RedDBError::Query(format!("openai-compat transport error: {err}")))?;
2794
2795    if !(200..300).contains(&status) {
2796        let message = openai_error_message(&body).unwrap_or_else(|| {
2797            if body.trim().is_empty() {
2798                "openai-compat embeddings request failed".to_string()
2799            } else {
2800                body.clone()
2801            }
2802        });
2803        return Err(RedDBError::Query(format!(
2804            "openai-compat embeddings request failed (status {status}): {message}"
2805        )));
2806    }
2807
2808    let parsed = parse_openai_embedding_response(&body)?;
2809    Ok(OpenAiCompatEmbeddingsResponse {
2810        model: parsed.model,
2811        embeddings: parsed.embeddings,
2812        usage: OpenAiCompatUsage {
2813            input_tokens: parsed.prompt_tokens,
2814            output_tokens: None,
2815            total_tokens: parsed.total_tokens,
2816        },
2817    })
2818}
2819
2820// ============================================================================
2821// Provider mode selector (issue gh-516)
2822//
2823// `red.config.ai.provider` picks the wire-protocol family that engine
2824// consumers (currently AskPipeline) should use. This is intentionally
2825// distinct from `red.config.ai.default.provider`, which names a
2826// concrete vendor (openai, groq, ollama, ...). The mode selector
2827// answers the prior question of which HTTP shape to speak.
2828// ============================================================================
2829
2830/// Wire-protocol family used by engine-side AI consumers.
2831#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2832pub enum AiProviderMode {
2833    /// Generic OpenAI-compatible client (`POST {api_base}/chat/completions`).
2834    OpenAiCompat,
2835    /// Vendor-native OpenAI client (api.openai.com, default headers).
2836    OpenAiNative,
2837    /// Vendor-native Anthropic client (api.anthropic.com, x-api-key).
2838    AnthropicNative,
2839}
2840
2841impl AiProviderMode {
2842    pub fn token(&self) -> &'static str {
2843        match self {
2844            Self::OpenAiCompat => "openai-compat",
2845            Self::OpenAiNative => "openai-native",
2846            Self::AnthropicNative => "anthropic-native",
2847        }
2848    }
2849}
2850
2851/// Parse a mode token. Accepts hyphen or underscore spellings.
2852pub fn parse_provider_mode(name: &str) -> Option<AiProviderMode> {
2853    match name.trim().to_ascii_lowercase().as_str() {
2854        "openai-compat" | "openai_compat" | "openaicompat" => Some(AiProviderMode::OpenAiCompat),
2855        "openai-native" | "openai_native" | "openainative" => Some(AiProviderMode::OpenAiNative),
2856        "anthropic-native" | "anthropic_native" | "anthropicnative" => {
2857            Some(AiProviderMode::AnthropicNative)
2858        }
2859        _ => None,
2860    }
2861}
2862
2863/// Resolve the provider mode. Lookup chain:
2864/// 1. `REDDB_AI_PROVIDER_MODE` env var.
2865/// 2. `red_config` KV key `red.config.ai.provider`.
2866/// 3. Returns `None` so callers can fall back to their existing
2867///    vendor-based routing.
2868pub fn resolve_provider_mode<F>(kv_getter: &F) -> Option<AiProviderMode>
2869where
2870    F: Fn(&str) -> crate::RedDBResult<Option<String>>,
2871{
2872    if let Ok(value) = std::env::var("REDDB_AI_PROVIDER_MODE") {
2873        if let Some(mode) = parse_provider_mode(&value) {
2874            return Some(mode);
2875        }
2876    }
2877    if let Ok(Some(value)) = kv_getter("red.config.ai.provider") {
2878        if let Some(mode) = parse_provider_mode(&value) {
2879            return Some(mode);
2880        }
2881    }
2882    None
2883}
2884
2885/// Map a mode to the matching [`AiProvider`] variant. `OpenAiCompat`
2886/// stays as a `Custom("")` marker — callers must resolve the actual
2887/// api_base separately (typically via `resolve_api_base_with_kv`).
2888pub fn provider_mode_to_provider(mode: AiProviderMode) -> AiProvider {
2889    match mode {
2890        AiProviderMode::OpenAiNative => AiProvider::OpenAi,
2891        AiProviderMode::AnthropicNative => AiProvider::Anthropic,
2892        AiProviderMode::OpenAiCompat => AiProvider::Custom(String::new()),
2893    }
2894}