ai_lib/provider/
generic.rs

1use super::config::ProviderConfig;
2use crate::api::{
3    ChatCompletionChunk, ChatProvider, ChoiceDelta, MessageDelta, ModelInfo, ModelPermission,
4};
5use crate::metrics::{Metrics, NoopMetrics};
6use crate::transport::{DynHttpTransportRef, HttpTransport};
7use crate::types::{
8    AiLibError, ChatCompletionRequest, ChatCompletionResponse, Choice, Message, Role, Usage,
9    UsageStatus,
10};
11use futures::stream::{Stream, StreamExt};
12use std::env;
13use std::sync::Arc;
14/// Configuration-driven generic adapter for OpenAI-compatible APIs
15pub struct GenericAdapter {
16    transport: DynHttpTransportRef,
17    config: ProviderConfig,
18    api_key: Option<String>,
19    metrics: Arc<dyn Metrics>,
20}
21
22#[cfg(all(test, not(feature = "unified_sse")))]
23mod legacy_sse_tests {
24    use super::*;
25
26    #[test]
27    fn legacy_event_sequence_non_ascii() {
28        let event1 = "data: {\"id\":\"1\",\"object\":\"chat.completion.chunk\",\"created\":0,\"model\":\"m\",\"choices\":[{\"delta\":{\"role\":\"assistant\",\"content\":\"你好,\"}}]}\n\n";
29        let event2 = "data: {\"id\":\"2\",\"object\":\"chat.completion.chunk\",\"created\":0,\"model\":\"m\",\"choices\":[{\"delta\":{\"content\":\"世界!\"}}]}\n\n";
30        let mut buffer = [event1.as_bytes(), event2.as_bytes()].concat();
31        let mut out: Vec<String> = Vec::new();
32        while let Some(boundary) = GenericAdapter::find_event_boundary(&buffer) {
33            let event_bytes = buffer.drain(..boundary).collect::<Vec<_>>();
34            if let Ok(event_text) = std::str::from_utf8(&event_bytes) {
35                if let Some(parsed) = GenericAdapter::parse_sse_event(event_text) {
36                    let chunk = parsed.expect("ok").expect("chunk");
37                    if let Some(c) = &chunk.choices[0].delta.content {
38                        out.push(c.clone());
39                    }
40                }
41            }
42        }
43        assert_eq!(out, vec!["你好,".to_string(), "世界!".to_string()]);
44    }
45}
46
47impl GenericAdapter {
48    pub fn new(config: ProviderConfig) -> Result<Self, AiLibError> {
49        // Validate configuration
50        config.validate()?;
51
52        // For generic/config-driven providers we treat the API key as optional.
53        // Some deployments (e.g. local Ollama) don't require a key. If the env var
54        // is missing we continue with None and callers will simply omit auth headers.
55        let api_key = env::var(&config.api_key_env).ok();
56
57        Ok(Self {
58            transport: HttpTransport::new_without_proxy().boxed(),
59            config,
60            api_key,
61            metrics: Arc::new(NoopMetrics::new()),
62        })
63    }
64
65    /// Create adapter with an explicit API key override (takes precedence over env var).
66    pub fn new_with_api_key(
67        config: ProviderConfig,
68        api_key_override: Option<String>,
69    ) -> Result<Self, AiLibError> {
70        config.validate()?;
71        let api_key = api_key_override.or_else(|| env::var(&config.api_key_env).ok());
72        Ok(Self {
73            transport: HttpTransport::new_without_proxy().boxed(),
74            config,
75            api_key,
76            metrics: Arc::new(NoopMetrics::new()),
77        })
78    }
79
80    /// Create adapter with custom transport layer (for testing)
81    pub fn with_transport(
82        config: ProviderConfig,
83        transport: HttpTransport,
84    ) -> Result<Self, AiLibError> {
85        // Validate configuration
86        config.validate()?;
87
88        let api_key = env::var(&config.api_key_env).ok();
89
90        Ok(Self {
91            transport: transport.boxed(),
92            config,
93            api_key,
94            metrics: Arc::new(NoopMetrics::new()),
95        })
96    }
97
98    /// Custom transport + API key override.
99    pub fn with_transport_api_key(
100        config: ProviderConfig,
101        transport: HttpTransport,
102        api_key_override: Option<String>,
103    ) -> Result<Self, AiLibError> {
104        config.validate()?;
105        let api_key = api_key_override.or_else(|| env::var(&config.api_key_env).ok());
106        Ok(Self {
107            transport: transport.boxed(),
108            config,
109            api_key,
110            metrics: Arc::new(NoopMetrics::new()),
111        })
112    }
113
114    /// Accept an object-safe transport reference directly
115    pub fn with_transport_ref(
116        config: ProviderConfig,
117        transport: DynHttpTransportRef,
118    ) -> Result<Self, AiLibError> {
119        // Validate configuration
120        config.validate()?;
121
122        let api_key = env::var(&config.api_key_env).ok();
123        Ok(Self {
124            transport,
125            config,
126            api_key,
127            metrics: Arc::new(NoopMetrics::new()),
128        })
129    }
130
131    /// Object-safe transport + API key override.
132    pub fn with_transport_ref_api_key(
133        config: ProviderConfig,
134        transport: DynHttpTransportRef,
135        api_key_override: Option<String>,
136    ) -> Result<Self, AiLibError> {
137        config.validate()?;
138        let api_key = api_key_override.or_else(|| env::var(&config.api_key_env).ok());
139        Ok(Self {
140            transport,
141            config,
142            api_key,
143            metrics: Arc::new(NoopMetrics::new()),
144        })
145    }
146
147    /// Create adapter with custom transport and an injected metrics implementation
148    pub fn with_transport_ref_and_metrics(
149        config: ProviderConfig,
150        transport: DynHttpTransportRef,
151        metrics: Arc<dyn Metrics>,
152    ) -> Result<Self, AiLibError> {
153        // Validate configuration
154        config.validate()?;
155
156        let api_key = env::var(&config.api_key_env).ok();
157        Ok(Self {
158            transport,
159            config,
160            api_key,
161            metrics,
162        })
163    }
164
165    /// Create adapter with injected metrics (uses default HttpTransport)
166    pub fn with_metrics(
167        config: ProviderConfig,
168        metrics: Arc<dyn Metrics>,
169    ) -> Result<Self, AiLibError> {
170        // Validate configuration
171        config.validate()?;
172
173        let api_key = env::var(&config.api_key_env).ok();
174        Ok(Self {
175            transport: HttpTransport::new().boxed(),
176            config,
177            api_key,
178            metrics,
179        })
180    }
181
182    /// Convert generic request to provider-specific format (async: may upload local files)
183    async fn convert_request(
184        &self,
185        request: &ChatCompletionRequest,
186    ) -> Result<serde_json::Value, AiLibError> {
187        let default_role = "user".to_string();
188
189        // Build messages array; may perform uploads for local files
190        let mut messages: Vec<serde_json::Value> = Vec::with_capacity(request.messages.len());
191        for msg in request.messages.iter() {
192            let role_key = format!("{:?}", msg.role);
193            let mapped_role = self
194                .config
195                .field_mapping
196                .role_mapping
197                .get(&role_key)
198                .unwrap_or(&default_role)
199                .clone();
200
201            // Handle multimodal: if image has no url but has a name and upload endpoint configured, upload it
202            let content_val = match &msg.content {
203                crate::types::common::Content::Image {
204                    url,
205                    mime: _mime,
206                    name,
207                } => {
208                    if url.is_some() {
209                        crate::provider::utils::content_to_provider_value(&msg.content)
210                    } else if let Some(n) = name {
211                        if let Some(upload_ep) = &self.config.upload_endpoint {
212                            let upload_url = format!(
213                                "{}{}",
214                                self.config.base_url.trim_end_matches('/'),
215                                upload_ep
216                            );
217                            // Decide whether to upload or inline based on configured size limit.
218                            let should_upload = match self.config.upload_size_limit {
219                                Some(limit) => match std::fs::metadata(n) {
220                                    Ok(meta) => meta.len() > limit,
221                                    Err(_) => true, // if we can't stat the file, attempt upload
222                                },
223                                None => true, // default: upload if no limit configured (preserve prior behavior)
224                            };
225
226                            if should_upload {
227                                // Use the injected transport when available so tests can mock uploads.
228                                match crate::provider::utils::upload_file_with_transport(
229                                    Some(self.transport.clone()),
230                                    &upload_url,
231                                    n,
232                                    "file",
233                                )
234                                .await
235                                {
236                                    Ok(remote_url) => {
237                                        if remote_url.starts_with("http://")
238                                            || remote_url.starts_with("https://")
239                                            || remote_url.starts_with("data:")
240                                        {
241                                            serde_json::json!({"image": {"url": remote_url}})
242                                        } else {
243                                            serde_json::json!({"image": {"file_id": remote_url}})
244                                        }
245                                    }
246                                    Err(_) => crate::provider::utils::content_to_provider_value(
247                                        &msg.content,
248                                    ),
249                                }
250                            } else {
251                                // Inline small files as data URLs
252                                crate::provider::utils::content_to_provider_value(&msg.content)
253                            }
254                        } else {
255                            crate::provider::utils::content_to_provider_value(&msg.content)
256                        }
257                    } else {
258                        crate::provider::utils::content_to_provider_value(&msg.content)
259                    }
260                }
261                _ => crate::provider::utils::content_to_provider_value(&msg.content),
262            };
263
264            messages.push(serde_json::json!({"role": mapped_role, "content": content_val}));
265        }
266
267        // Use string literals as JSON keys
268        let mut provider_request = serde_json::json!({
269            "model": request.model,
270            "messages": messages
271        });
272
273        // Add optional parameters
274        if let Some(temp) = request.temperature {
275            provider_request["temperature"] =
276                serde_json::Value::Number(serde_json::Number::from_f64(temp.into()).unwrap());
277        }
278        if let Some(max_tokens) = request.max_tokens {
279            provider_request["max_tokens"] =
280                serde_json::Value::Number(serde_json::Number::from(max_tokens));
281        }
282        if let Some(top_p) = request.top_p {
283            provider_request["top_p"] =
284                serde_json::Value::Number(serde_json::Number::from_f64(top_p.into()).unwrap());
285        }
286        if let Some(freq_penalty) = request.frequency_penalty {
287            provider_request["frequency_penalty"] = serde_json::Value::Number(
288                serde_json::Number::from_f64(freq_penalty.into()).unwrap(),
289            );
290        }
291        if let Some(presence_penalty) = request.presence_penalty {
292            provider_request["presence_penalty"] = serde_json::Value::Number(
293                serde_json::Number::from_f64(presence_penalty.into()).unwrap(),
294            );
295        }
296
297        // Function calling (OpenAI-compatible). Many config-driven providers accept this schema.
298        if let Some(funcs) = &request.functions {
299            let mapped: Vec<serde_json::Value> = funcs
300                .iter()
301                .map(|t| {
302                    serde_json::json!({
303                        "name": t.name,
304                        "description": t.description,
305                        "parameters": t.parameters.clone().unwrap_or(serde_json::json!({}))
306                    })
307                })
308                .collect();
309            provider_request["functions"] = serde_json::Value::Array(mapped);
310        }
311
312        if let Some(policy) = &request.function_call {
313            match policy {
314                crate::types::FunctionCallPolicy::Auto(name) => {
315                    if name == "auto" {
316                        provider_request["function_call"] =
317                            serde_json::Value::String("auto".to_string());
318                    } else {
319                        provider_request["function_call"] = serde_json::json!({"name": name});
320                    }
321                }
322                crate::types::FunctionCallPolicy::None => {
323                    provider_request["function_call"] =
324                        serde_json::Value::String("none".to_string());
325                }
326            }
327        }
328
329        request.apply_extensions(&mut provider_request);
330
331        Ok(provider_request)
332    }
333
334    /// Find event boundary
335    /// Deprecated path (legacy SSE helper). Prefer `crate::sse::parser` when `unified_sse` is enabled.
336    #[cfg(not(feature = "unified_sse"))]
337    fn find_event_boundary(buffer: &[u8]) -> Option<usize> {
338        let mut i = 0;
339        while i < buffer.len().saturating_sub(1) {
340            if buffer[i] == b'\n' && buffer[i + 1] == b'\n' {
341                return Some(i + 2);
342            }
343            if i < buffer.len().saturating_sub(3)
344                && buffer[i] == b'\r'
345                && buffer[i + 1] == b'\n'
346                && buffer[i + 2] == b'\r'
347                && buffer[i + 3] == b'\n'
348            {
349                return Some(i + 4);
350            }
351            i += 1;
352        }
353        None
354    }
355
356    /// Parse SSE event
357    /// Deprecated path (legacy SSE helper). Prefer `crate::sse::parser` when `unified_sse` is enabled.
358    #[cfg(not(feature = "unified_sse"))]
359    fn parse_sse_event(
360        event_text: &str,
361    ) -> Option<Result<Option<ChatCompletionChunk>, AiLibError>> {
362        for line in event_text.lines() {
363            let line = line.trim();
364            if let Some(stripped) = line.strip_prefix("data: ") {
365                let data = stripped;
366                if data == "[DONE]" {
367                    return Some(Ok(None));
368                }
369                return Some(Self::parse_chunk_data(data));
370            }
371        }
372        None
373    }
374
375    /// Parse chunk data
376    /// Deprecated path (legacy SSE helper). Prefer `crate::sse::parser` when `unified_sse` is enabled.
377    #[cfg(not(feature = "unified_sse"))]
378    fn parse_chunk_data(data: &str) -> Result<Option<ChatCompletionChunk>, AiLibError> {
379        match serde_json::from_str::<serde_json::Value>(data) {
380            Ok(json) => {
381                let choices = json["choices"]
382                    .as_array()
383                    .map(|arr| {
384                        arr.iter()
385                            .enumerate()
386                            .map(|(index, choice)| {
387                                let delta = &choice["delta"];
388                                ChoiceDelta {
389                                    index: index as u32,
390                                    delta: MessageDelta {
391                                        role: delta["role"].as_str().map(|r| match r {
392                                            "assistant" => Role::Assistant,
393                                            "user" => Role::User,
394                                            "system" => Role::System,
395                                            _ => Role::Assistant,
396                                        }),
397                                        content: delta["content"].as_str().map(str::to_string),
398                                    },
399                                    finish_reason: choice["finish_reason"]
400                                        .as_str()
401                                        .map(str::to_string),
402                                }
403                            })
404                            .collect()
405                    })
406                    .unwrap_or_default();
407
408                Ok(Some(ChatCompletionChunk {
409                    id: json["id"].as_str().unwrap_or_default().to_string(),
410                    object: json["object"]
411                        .as_str()
412                        .unwrap_or("chat.completion.chunk")
413                        .to_string(),
414                    created: json["created"].as_u64().unwrap_or(0),
415                    model: json["model"].as_str().unwrap_or_default().to_string(),
416                    choices,
417                }))
418            }
419            Err(e) => Err(AiLibError::ProviderError(format!(
420                "JSON parse error: {}",
421                e
422            ))),
423        }
424    }
425
426    // legacy test moved to separate file under tests/ when needed
427
428    fn split_text_into_chunks(text: &str, max_len: usize) -> Vec<String> {
429        let mut chunks = Vec::new();
430        let mut start = 0;
431        let bytes = text.as_bytes();
432        while start < bytes.len() {
433            let end = std::cmp::min(start + max_len, bytes.len());
434            let mut cut = end;
435            if end < bytes.len() {
436                if let Some(pos) = text[start..end].rfind(' ') {
437                    cut = start + pos;
438                }
439            }
440            if cut == start {
441                cut = end;
442            }
443            chunks.push(String::from_utf8_lossy(&bytes[start..cut]).to_string());
444            start = cut;
445            if start < bytes.len() && bytes[start] == b' ' {
446                start += 1;
447            }
448        }
449        chunks
450    }
451
452    /// Parse response
453    fn parse_response(
454        &self,
455        response: serde_json::Value,
456    ) -> Result<ChatCompletionResponse, AiLibError> {
457        let choices = response["choices"]
458            .as_array()
459            .ok_or_else(|| {
460                AiLibError::ProviderError("Invalid response format: choices not found".to_string())
461            })?
462            .iter()
463            .enumerate()
464            .map(|(index, choice)| {
465                let message = choice["message"].as_object().ok_or_else(|| {
466                    AiLibError::ProviderError("Invalid choice format".to_string())
467                })?;
468
469                let role = match message["role"].as_str().unwrap_or("user") {
470                    "system" => Role::System,
471                    "assistant" => Role::Assistant,
472                    _ => Role::User,
473                };
474
475                let content = message["content"].as_str().unwrap_or("").to_string();
476
477                // try to parse a function_call if present
478                let mut function_call: Option<crate::types::function_call::FunctionCall> = None;
479                if let Some(fc_val) = message.get("function_call") {
480                    // attempt full deserialization
481                    if let Ok(mut fc) = serde_json::from_value::<
482                        crate::types::function_call::FunctionCall,
483                    >(fc_val.clone())
484                    {
485                        // If the provider deserialized arguments as a JSON string, try to parse it into structured JSON.
486                        if let Some(arg_val) = &fc.arguments {
487                            if arg_val.is_string() {
488                                if let Some(s) = arg_val.as_str() {
489                                    if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s)
490                                    {
491                                        fc.arguments = Some(parsed);
492                                    }
493                                }
494                            }
495                        }
496                        function_call = Some(fc);
497                    } else {
498                        // fallback: try to extract name + arguments (arguments may be a string)
499                        let name = fc_val
500                            .get("name")
501                            .and_then(|v| v.as_str())
502                            .map(|s| s.to_string());
503                        if let Some(name) = name {
504                            let args = fc_val.get("arguments").and_then(|a| {
505                                if a.is_string() {
506                                    serde_json::from_str::<serde_json::Value>(a.as_str().unwrap())
507                                        .ok()
508                                } else {
509                                    Some(a.clone())
510                                }
511                            });
512
513                            function_call = Some(crate::types::function_call::FunctionCall {
514                                name,
515                                arguments: args,
516                            });
517                        }
518                    }
519                } else if let Some(tool_calls) =
520                    message.get("tool_calls").and_then(|v| v.as_array())
521                {
522                    // OpenAI tool_calls format: [{"type":"function","function":{"name":...,"arguments":...}}]
523                    if let Some(first) = tool_calls.first() {
524                        if let Some(func) = first.get("function") {
525                            if let Some(name) = func.get("name").and_then(|v| v.as_str()) {
526                                let mut args_opt = func.get("arguments").cloned();
527                                // If arguments is a string, attempt to parse JSON
528                                if let Some(args_val) = &args_opt {
529                                    if args_val.is_string() {
530                                        if let Some(s) = args_val.as_str() {
531                                            if let Ok(parsed) =
532                                                serde_json::from_str::<serde_json::Value>(s)
533                                            {
534                                                args_opt = Some(parsed);
535                                            }
536                                        }
537                                    }
538                                }
539                                function_call = Some(crate::types::function_call::FunctionCall {
540                                    name: name.to_string(),
541                                    arguments: args_opt,
542                                });
543                            }
544                        }
545                    }
546                }
547
548                Ok(Choice {
549                    index: index as u32,
550                    message: Message {
551                        role,
552                        content: crate::types::common::Content::Text(content),
553                        function_call,
554                    },
555                    finish_reason: choice["finish_reason"].as_str().map(|s| s.to_string()),
556                })
557            })
558            .collect::<Result<Vec<_>, AiLibError>>()?;
559
560        let usage = response["usage"].as_object().ok_or_else(|| {
561            AiLibError::ProviderError("Invalid response format: usage not found".to_string())
562        })?;
563
564        let usage = Usage {
565            prompt_tokens: usage["prompt_tokens"].as_u64().unwrap_or(0) as u32,
566            completion_tokens: usage["completion_tokens"].as_u64().unwrap_or(0) as u32,
567            total_tokens: usage["total_tokens"].as_u64().unwrap_or(0) as u32,
568        };
569
570        Ok(ChatCompletionResponse {
571            id: response["id"].as_str().unwrap_or("").to_string(),
572            object: response["object"].as_str().unwrap_or("").to_string(),
573            created: response["created"].as_u64().unwrap_or(0),
574            model: response["model"].as_str().unwrap_or("").to_string(),
575            choices,
576            usage,
577            usage_status: UsageStatus::Finalized, // Generic adapter assumes OpenAI-compatible format
578        })
579    }
580}
581#[async_trait::async_trait]
582impl ChatProvider for GenericAdapter {
583    fn name(&self) -> &str {
584        "GenericAdapter"
585    }
586
587    async fn chat(
588        &self,
589        request: ChatCompletionRequest,
590    ) -> Result<ChatCompletionResponse, AiLibError> {
591        // metrics: standardized keys
592        let provider_key = "generic";
593        self.metrics
594            .incr_counter(&crate::metrics::keys::requests(provider_key), 1)
595            .await;
596        let timer = self
597            .metrics
598            .start_timer(&crate::metrics::keys::request_duration_ms(provider_key))
599            .await;
600
601        // Build request body & headers
602        let url = self.config.chat_url();
603        let provider_request = self.convert_request(&request).await?;
604        let mut headers = self.config.headers.clone();
605        if let Some(key) = &self.api_key {
606            if self.config.base_url.contains("anthropic.com") {
607                headers.insert("x-api-key".to_string(), key.clone());
608                // Anthropic requires version header per API docs
609                headers.insert("anthropic-version".to_string(), "2023-06-01".to_string());
610            } else {
611                headers.insert("Authorization".to_string(), format!("Bearer {}", key));
612            }
613        }
614
615        // Use transport to allow mocking in tests
616        let response_json = self
617            .transport
618            .post_json(&url, Some(headers), provider_request)
619            .await
620            .map_err(|e| e.with_context(&format!("GenericAdapter chat request to {}", url)))?;
621
622        // Stop timer
623        if let Some(t) = timer {
624            t.stop();
625        }
626
627        let parsed = self.parse_response(response_json)?;
628
629        // optional cost metrics
630        #[cfg(feature = "cost_metrics")]
631        {
632            let usd = crate::metrics::cost::estimate_usd(
633                parsed.usage.prompt_tokens,
634                parsed.usage.completion_tokens,
635            );
636            crate::metrics::cost::record_cost(
637                self.metrics.as_ref(),
638                provider_key,
639                &parsed.model,
640                usd,
641            )
642            .await;
643        }
644
645        Ok(parsed)
646    }
647
648    async fn stream(
649        &self,
650        request: ChatCompletionRequest,
651    ) -> Result<
652        Box<dyn Stream<Item = Result<ChatCompletionChunk, AiLibError>> + Send + Unpin>,
653        AiLibError,
654    > {
655        let mut stream_request = self.convert_request(&request).await?;
656        stream_request["stream"] = serde_json::Value::Bool(true);
657        let url = self.config.chat_url();
658
659        let mut headers = self.config.headers.clone();
660        headers.insert("Accept".to_string(), "text/event-stream".to_string());
661        if let Some(key) = &self.api_key {
662            if self.config.base_url.contains("anthropic.com") {
663                headers.insert("x-api-key".to_string(), key.clone());
664                headers.insert("anthropic-version".to_string(), "2023-06-01".to_string());
665            } else {
666                headers.insert("Authorization".to_string(), format!("Bearer {}", key));
667            }
668        }
669
670        let byte_stream_res = self
671            .transport
672            .post_stream(&url, Some(headers), stream_request)
673            .await;
674
675        match byte_stream_res {
676            Ok(mut byte_stream) => {
677                let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
678                tokio::spawn(async move {
679                    let mut buffer = Vec::new();
680                    while let Some(result) = byte_stream.next().await {
681                        match result {
682                            Ok(bytes) => {
683                                buffer.extend_from_slice(&bytes);
684                                #[cfg(feature = "unified_sse")]
685                                {
686                                    while let Some(event_end) =
687                                        crate::sse::parser::find_event_boundary(&buffer)
688                                    {
689                                        let event_bytes =
690                                            buffer.drain(..event_end).collect::<Vec<_>>();
691                                        if let Ok(event_text) = std::str::from_utf8(&event_bytes) {
692                                            if let Some(chunk) =
693                                                crate::sse::parser::parse_sse_event(event_text)
694                                            {
695                                                match chunk {
696                                                    Ok(Some(c)) => {
697                                                        if tx.send(Ok(c)).is_err() {
698                                                            return;
699                                                        }
700                                                    }
701                                                    Ok(None) => return,
702                                                    Err(e) => {
703                                                        let _ = tx.send(Err(e));
704                                                        return;
705                                                    }
706                                                }
707                                            }
708                                        }
709                                    }
710                                }
711                                #[cfg(not(feature = "unified_sse"))]
712                                {
713                                    while let Some(event_end) = Self::find_event_boundary(&buffer) {
714                                        let event_bytes =
715                                            buffer.drain(..event_end).collect::<Vec<_>>();
716                                        if let Ok(event_text) = std::str::from_utf8(&event_bytes) {
717                                            if let Some(chunk) = Self::parse_sse_event(event_text) {
718                                                match chunk {
719                                                    Ok(Some(c)) => {
720                                                        if tx.send(Ok(c)).is_err() {
721                                                            return;
722                                                        }
723                                                    }
724                                                    Ok(None) => return,
725                                                    Err(e) => {
726                                                        let _ = tx.send(Err(e));
727                                                        return;
728                                                    }
729                                                }
730                                            }
731                                        }
732                                    }
733                                }
734                            }
735                            Err(e) => {
736                                let _ = tx.send(Err(AiLibError::ProviderError(format!(
737                                    "Stream error: {}",
738                                    e
739                                ))));
740                                break;
741                            }
742                        }
743                    }
744                });
745                let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
746                Ok(Box::new(Box::pin(stream)))
747            }
748            Err(_) => {
749                // Fallback to non-streaming + simulated chunks
750                let finished = self.chat(request).await?;
751                let text = finished
752                    .choices
753                    .first()
754                    .map(|c| c.message.content.as_text())
755                    .unwrap_or_default();
756                let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
757                tokio::spawn(async move {
758                    let chunks = Self::split_text_into_chunks(&text, 80);
759                    for chunk in chunks {
760                        let delta = ChoiceDelta {
761                            index: 0,
762                            delta: MessageDelta {
763                                role: Some(Role::Assistant),
764                                content: Some(chunk.clone()),
765                            },
766                            finish_reason: None,
767                        };
768                        let chunk_obj = ChatCompletionChunk {
769                            id: "simulated".to_string(),
770                            object: "chat.completion.chunk".to_string(),
771                            created: 0,
772                            model: finished.model.clone(),
773                            choices: vec![delta],
774                        };
775                        if tx.send(Ok(chunk_obj)).is_err() {
776                            return;
777                        }
778                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
779                    }
780                });
781                let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
782                Ok(Box::new(Box::pin(stream)))
783            }
784        }
785    }
786
787    async fn list_models(&self) -> Result<Vec<String>, AiLibError> {
788        if let Some(models_endpoint) = &self.config.models_endpoint {
789            let url = format!("{}{}", self.config.base_url, models_endpoint);
790            let mut headers = self.config.headers.clone();
791            if let Some(key) = &self.api_key {
792                if self.config.base_url.contains("anthropic.com") {
793                    headers.insert("x-api-key".to_string(), key.clone());
794                } else {
795                    headers.insert("Authorization".to_string(), format!("Bearer {}", key));
796                }
797            }
798            let response = self.transport.get_json(&url, Some(headers)).await?;
799            Ok(response["data"]
800                .as_array()
801                .unwrap_or(&vec![])
802                .iter()
803                .filter_map(|model| model["id"].as_str().map(|s| s.to_string()))
804                .collect())
805        } else {
806            Err(AiLibError::ProviderError(
807                "Models endpoint not configured".to_string(),
808            ))
809        }
810    }
811
812    async fn get_model_info(&self, model_id: &str) -> Result<ModelInfo, AiLibError> {
813        Ok(ModelInfo {
814            id: model_id.to_string(),
815            object: "model".to_string(),
816            created: 0,
817            owned_by: "generic".to_string(),
818            permission: vec![ModelPermission {
819                id: "default".to_string(),
820                object: "model_permission".to_string(),
821                created: 0,
822                allow_create_engine: false,
823                allow_sampling: true,
824                allow_logprobs: false,
825                allow_search_indices: false,
826                allow_view: true,
827                allow_fine_tuning: false,
828                organization: "*".to_string(),
829                group: None,
830                is_blocking: false,
831            }],
832        })
833    }
834}