Skip to main content

claudius/
client.rs

1use std::collections::VecDeque;
2use std::env;
3use std::error::Error as StdError;
4use std::fs;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use std::time::{Duration, Instant};
9
10use bytes::Bytes;
11use futures::{Stream, StreamExt, stream};
12use reqwest::header::{HeaderMap, HeaderValue};
13use reqwest::{Client as ReqwestClient, Response, header};
14use serde::Deserialize;
15use tokio::time::sleep;
16
17use crate::AccumulatingStream;
18use crate::backoff::ExponentialBackoff;
19use crate::client_logger::ClientLogger;
20use crate::error::{Error, Result};
21use crate::observability::{
22    CLIENT_REQUEST_DURATION, CLIENT_REQUEST_ERRORS, CLIENT_REQUEST_RETRIES, CLIENT_REQUESTS,
23    CLIENT_RETRY_BACKOFF,
24};
25use crate::sse::process_message_stream_sse;
26use crate::types::{
27    DeletedMessageBatch, Message, MessageBatch, MessageBatchCreateParams, MessageBatchListParams,
28    MessageBatchListResponse, MessageBatchResult, MessageCountTokensParams, MessageCreateParams,
29    MessageStreamEvent, MessageTokensCount, ModelInfo, ModelListParams, ModelListResponse,
30};
31
32/// A stream wrapper that logs events and the final message through a [`ClientLogger`].
33///
34/// This stream passes through all events from the underlying [`AccumulatingStream`],
35/// logging each event as it occurs and logging the final reconstructed message
36/// when the stream completes.
37pub struct LoggingStream<'a> {
38    inner: AccumulatingStream,
39    logger: &'a dyn ClientLogger,
40    receiver: Option<tokio::sync::oneshot::Receiver<Result<Message>>>,
41}
42
43impl<'a> LoggingStream<'a> {
44    /// Create a new logging stream wrapper.
45    fn new(
46        inner: AccumulatingStream,
47        receiver: tokio::sync::oneshot::Receiver<Result<Message>>,
48        logger: &'a dyn ClientLogger,
49    ) -> Self {
50        Self {
51            inner,
52            logger,
53            receiver: Some(receiver),
54        }
55    }
56}
57
58impl Stream for LoggingStream<'_> {
59    type Item = Result<MessageStreamEvent>;
60
61    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62        let inner = Pin::new(&mut self.inner);
63        match inner.poll_next(cx) {
64            Poll::Ready(Some(Ok(event))) => {
65                self.logger.log_stream_event(&event);
66                Poll::Ready(Some(Ok(event)))
67            }
68            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
69            Poll::Ready(None) => {
70                // Stream ended - try to get the accumulated message
71                if let Some(mut receiver) = self.receiver.take()
72                    && let Ok(Ok(ref message)) = receiver.try_recv()
73                {
74                    self.logger.log_stream_message(message);
75                }
76                Poll::Ready(None)
77            }
78            Poll::Pending => Poll::Pending,
79        }
80    }
81}
82
83const DEFAULT_API_URL: &str = "https://api.anthropic.com";
84const ANTHROPIC_API_VERSION: &str = "2023-06-01";
85/// Default connect/read inactivity timeout shared by all requests.
86const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
87const STRUCTURED_OUTPUTS_BETA: &str = "structured-outputs-2025-11-13";
88
89fn stream_debug_enabled() -> bool {
90    env::var_os("CLAUDIUS_DEBUG_STREAM").is_some()
91}
92
93fn debug_stream_request(url: &str, params: &MessageCreateParams) {
94    if !stream_debug_enabled() {
95        return;
96    }
97
98    match serde_json::to_string_pretty(params) {
99        Ok(body) => eprintln!("[claudius-debug] stream request POST {url}\n{body}"),
100        Err(err) => eprintln!("[claudius-debug] failed to serialize stream request: {err}"),
101    }
102}
103
104fn format_reqwest_error(err: &reqwest::Error) -> String {
105    let mut parts = vec![err.to_string()];
106    let mut source = StdError::source(err);
107    while let Some(inner) = source {
108        let detail = inner.to_string();
109        if !parts.iter().any(|part| part == &detail) {
110            parts.push(detail);
111        }
112        source = inner.source();
113    }
114    parts.join(": ")
115}
116
117const MAX_MESSAGE_BATCH_RESULT_LINE_BYTES: usize = 64 * 1024 * 1024;
118
119struct MessageBatchJsonlState<S> {
120    byte_stream: S,
121    buffer: Vec<u8>,
122    pending_lines: VecDeque<Vec<u8>>,
123    finished: bool,
124}
125
126fn map_batch_result_stream_error(err: reqwest::Error) -> Error {
127    let details = format_reqwest_error(&err);
128    if err.is_timeout() {
129        Error::timeout(
130            format!("Message batch results stream timed out: {details}"),
131            None,
132        )
133    } else if err.is_connect() {
134        Error::connection(
135            format!("Message batch results stream connection error: {details}"),
136            Some(Box::new(err)),
137        )
138    } else {
139        Error::streaming(
140            format!("Error in message batch results stream: {details}"),
141            Some(Box::new(err)),
142        )
143    }
144}
145
146fn parse_message_batch_result_line(line: &[u8]) -> Result<MessageBatchResult> {
147    let text = std::str::from_utf8(line).map_err(|e| {
148        Error::encoding(
149            format!("Invalid UTF-8 in message batch results JSONL: {e}"),
150            Some(Box::new(e)),
151        )
152    })?;
153
154    serde_json::from_str::<MessageBatchResult>(text).map_err(|e| {
155        Error::serialization(
156            format!("Failed to parse message batch results JSONL line: {e}"),
157            Some(Box::new(e)),
158        )
159    })
160}
161
162fn trim_jsonl_line(mut line: Vec<u8>) -> Vec<u8> {
163    if line.ends_with(b"\n") {
164        line.pop();
165    }
166    if line.ends_with(b"\r") {
167        line.pop();
168    }
169    line
170}
171
172fn process_message_batch_result_jsonl<S>(
173    byte_stream: S,
174) -> impl Stream<Item = Result<MessageBatchResult>>
175where
176    S: Stream<Item = std::result::Result<Bytes, reqwest::Error>> + Unpin + 'static,
177{
178    let state = MessageBatchJsonlState {
179        byte_stream,
180        buffer: Vec::new(),
181        pending_lines: VecDeque::new(),
182        finished: false,
183    };
184
185    stream::unfold(state, |mut state| async move {
186        loop {
187            if let Some(line) = state.pending_lines.pop_front() {
188                if line.is_empty() {
189                    continue;
190                }
191                return Some((parse_message_batch_result_line(&line), state));
192            }
193
194            if state.finished {
195                if state.buffer.is_empty() {
196                    return None;
197                }
198                let line = trim_jsonl_line(std::mem::take(&mut state.buffer));
199                if line.is_empty() {
200                    continue;
201                }
202                return Some((parse_message_batch_result_line(&line), state));
203            }
204
205            match state.byte_stream.next().await {
206                Some(Ok(bytes)) => {
207                    state.buffer.extend_from_slice(&bytes);
208                    if state.buffer.len() > MAX_MESSAGE_BATCH_RESULT_LINE_BYTES {
209                        state.buffer.clear();
210                        state.finished = true;
211                        return Some((
212                            Err(Error::streaming(
213                                format!(
214                                    "Message batch results JSONL line exceeded maximum size of {} bytes",
215                                    MAX_MESSAGE_BATCH_RESULT_LINE_BYTES
216                                ),
217                                None,
218                            )),
219                            state,
220                        ));
221                    }
222
223                    while let Some(newline) = state.buffer.iter().position(|byte| *byte == b'\n') {
224                        let line = trim_jsonl_line(state.buffer.drain(..=newline).collect());
225                        if !line.is_empty() {
226                            state.pending_lines.push_back(line);
227                        }
228                    }
229                }
230                Some(Err(err)) => {
231                    state.finished = true;
232                    return Some((Err(map_batch_result_stream_error(err)), state));
233                }
234                None => {
235                    state.finished = true;
236                }
237            }
238        }
239    })
240}
241
242/// Client for the Anthropic API with performance optimizations.
243#[derive(Debug, Clone)]
244pub struct Anthropic {
245    api_key: String,
246    client: ReqwestClient,
247    base_url: String,
248    timeout: Duration,
249    max_retries: usize,
250    throughput_ops_sec: f64,
251    reserve_capacity: f64,
252    /// Cached headers for performance - Arc for cheap cloning
253    cached_headers: Arc<HeaderMap>,
254    /// Beta feature headers included on every request.
255    default_betas: Vec<String>,
256}
257
258impl Anthropic {
259    fn build_http_client(timeout: Duration) -> Result<ReqwestClient> {
260        ReqwestClient::builder()
261            .connect_timeout(timeout)
262            .read_timeout(timeout)
263            .pool_max_idle_per_host(10) // Connection pooling optimization
264            .pool_idle_timeout(Duration::from_secs(90))
265            .tcp_keepalive(Duration::from_secs(60))
266            .build()
267            .map_err(|e| {
268                Error::http_client(
269                    format!("Failed to build HTTP client: {e}"),
270                    Some(Box::new(e)),
271                )
272            })
273    }
274
275    /// Resolve an API key value, handling file:// URLs
276    fn resolve_api_key(key_value: &str) -> Result<String> {
277        if let Some(stripped) = key_value.strip_prefix("file://") {
278            // Handle file:// URLs
279            let path = if stripped.starts_with('/') {
280                // Absolute path: file:///root/.env -> /root/.env
281                stripped.to_string()
282            } else {
283                // Relative path: file://../foo -> ../foo
284                stripped.to_string()
285            };
286
287            fs::read_to_string(&path)
288                .map(|content| content.trim().to_string())
289                .map_err(|e| {
290                    Error::validation(
291                        format!("Failed to read API key from file '{}': {}", path, e),
292                        Some("api_key".to_string()),
293                    )
294                })
295        } else {
296            // Regular API key value
297            Ok(key_value.to_string())
298        }
299    }
300
301    /// Create a new Anthropic client.
302    ///
303    /// The API key can be provided directly or read from the CLAUDIUS_API_KEY or ANTHROPIC_API_KEY
304    /// environment variables. If an environment variable value starts with "file://", it will be
305    /// treated as a file path and the API key will be read from that file.
306    ///
307    /// The base URL is resolved from the CLAUDIUS_BASE_URL or ANTHROPIC_BASE_URL environment
308    /// variables, in that order. If neither is set, the default Anthropic API URL is used.
309    pub fn new(api_key: Option<String>) -> Result<Self> {
310        let api_key = match api_key {
311            Some(key) => Self::resolve_api_key(&key)?,
312            None => match env::var("CLAUDIUS_API_KEY").ok() {
313                Some(key) => Self::resolve_api_key(&key)?,
314                None => {
315                    let env_key = env::var("ANTHROPIC_API_KEY").map_err(|_| {
316                        Error::authentication(
317                            "API key not provided and ANTHROPIC_API_KEY environment variable not set",
318                        )
319                    })?;
320                    Self::resolve_api_key(&env_key)?
321                }
322            },
323        };
324
325        let timeout = DEFAULT_TIMEOUT;
326        let client = Self::build_http_client(timeout)?;
327
328        // Pre-build headers for performance
329        let cached_headers = Arc::new(Self::build_default_headers(&api_key)?);
330
331        // Resolve base URL from environment variables, defaulting to the API URL
332        let base_url = env::var("CLAUDIUS_BASE_URL")
333            .or_else(|_| env::var("ANTHROPIC_BASE_URL"))
334            .unwrap_or_else(|_| DEFAULT_API_URL.to_string());
335
336        Ok(Self {
337            api_key,
338            client,
339            base_url,
340            timeout,
341            max_retries: 3,
342            throughput_ops_sec: 1.0 / 60.0,
343            reserve_capacity: 1.0 / 60.0,
344            cached_headers,
345            default_betas: Vec::new(),
346        })
347    }
348
349    /// Set a custom base URL for this client.
350    ///
351    /// This method allows you to specify a different API endpoint for the client.
352    /// The base URL should be the root URL without the `/v1/` suffix - this will
353    /// be added automatically when constructing request URLs.
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// # use claudius::Anthropic;
359    /// // For Anthropic's API (default)
360    /// let client = Anthropic::new(Some("api-key".to_string()))?
361    ///     .with_base_url("https://api.anthropic.com".to_string());
362    ///
363    /// // For Minimax (international)
364    /// let client = Anthropic::new(Some("api-key".to_string()))?
365    ///     .with_base_url("https://api.minimax.io/anthropic".to_string());
366    ///
367    /// // For Minimax (China)
368    /// let client = Anthropic::new(Some("api-key".to_string()))?
369    ///     .with_base_url("https://api.minimaxi.com/anthropic".to_string());
370    /// # Ok::<(), claudius::Error>(())
371    /// ```
372    pub fn with_base_url(mut self, base_url: String) -> Self {
373        self.base_url = base_url;
374        self
375    }
376
377    /// Set a custom timeout for this client.
378    ///
379    /// This method allows you to specify a different connect/read inactivity
380    /// timeout for API requests.
381    pub fn with_timeout(mut self, timeout: Duration) -> Result<Self> {
382        self.timeout = timeout;
383
384        self.client = Self::build_http_client(timeout).map_err(|e| match e {
385            Error::HttpClient { source, .. } => Error::http_client(
386                "Failed to build HTTP client with new timeout",
387                source.map(|src| Box::new(src) as Box<dyn std::error::Error + Send + Sync>),
388            ),
389            other => other,
390        })?;
391        Ok(self)
392    }
393
394    /// Set the maximum number of retries for this client.
395    ///
396    /// This method allows you to specify how many times to retry failed requests.
397    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
398        self.max_retries = max_retries;
399        self
400    }
401
402    /// Get the API key being used by this client.
403    pub fn api_key(&self) -> &str {
404        &self.api_key
405    }
406
407    /// Set the backoff parameters for this client.
408    ///
409    /// This method allows you to configure the exponential backoff algorithm.
410    pub fn with_backoff_params(mut self, throughput_ops_sec: f64, reserve_capacity: f64) -> Self {
411        self.throughput_ops_sec = throughput_ops_sec;
412        self.reserve_capacity = reserve_capacity;
413        self
414    }
415
416    /// Set default beta feature headers included on every request.
417    ///
418    /// These are merged with any per-request betas and auto-detected betas
419    /// (like `structured-outputs-2025-11-13`). Duplicates are removed.
420    pub fn with_default_betas(
421        mut self,
422        betas: impl IntoIterator<Item = impl Into<String>>,
423    ) -> Self {
424        self.default_betas = betas.into_iter().map(Into::into).collect();
425        self
426    }
427
428    /// Set both a custom base URL and timeout for this client.
429    ///
430    /// This is a convenience method that chains with_base_url and with_timeout.
431    pub fn with_base_url_and_timeout(self, base_url: String, timeout: Duration) -> Result<Self> {
432        self.with_base_url(base_url).with_timeout(timeout)
433    }
434
435    /// Build default headers for API requests (static method for initialization).
436    fn build_default_headers(api_key: &str) -> Result<HeaderMap> {
437        let mut headers = HeaderMap::new();
438        headers.insert(
439            header::CONTENT_TYPE,
440            HeaderValue::from_static("application/json"),
441        );
442        headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
443        headers.insert(
444            "x-api-key",
445            HeaderValue::from_str(api_key).map_err(|e| {
446                Error::validation(
447                    format!("Invalid API key format: {e}"),
448                    Some("api_key".to_string()),
449                )
450            })?,
451        );
452        headers.insert(
453            "anthropic-version",
454            HeaderValue::from_static(ANTHROPIC_API_VERSION),
455        );
456        Ok(headers)
457    }
458
459    /// Get cached headers for performance (no allocation needed).
460    fn default_headers(&self) -> HeaderMap {
461        (*self.cached_headers).clone()
462    }
463
464    /// Build a full endpoint URL from the base URL and endpoint path.
465    ///
466    /// This method handles trailing slashes gracefully and always inserts `/v1/`
467    /// between the base URL and endpoint path. This allows the base URL to be
468    /// specified without requiring a specific format (with or without trailing slash,
469    /// with or without `/v1/` suffix).
470    ///
471    /// # Examples
472    ///
473    /// - Base: `https://api.anthropic.com`, endpoint: `messages` → `https://api.anthropic.com/v1/messages`
474    /// - Base: `https://api.minimax.io/anthropic`, endpoint: `messages` → `https://api.minimax.io/anthropic/v1/messages`
475    /// - Base: `https://example.com/`, endpoint: `models` → `https://example.com/v1/models`
476    fn build_url(&self, endpoint: &str) -> String {
477        let base = self.base_url.trim_end_matches('/');
478        format!("{}/v1/{}", base, endpoint)
479    }
480
481    /// Collect all beta strings from client defaults, per-request, and auto-detected sources.
482    ///
483    /// Returns a deduplicated, ordered list.
484    fn collect_betas(&self, request_betas: Option<&[String]>, auto_betas: &[&str]) -> Vec<String> {
485        let mut seen = std::collections::HashSet::new();
486        let mut result = Vec::new();
487
488        for beta in &self.default_betas {
489            if seen.insert(beta.as_str().to_owned()) {
490                result.push(beta.clone());
491            }
492        }
493
494        if let Some(betas) = request_betas {
495            for beta in betas {
496                if seen.insert(beta.clone()) {
497                    result.push(beta.clone());
498                }
499            }
500        }
501
502        for &beta in auto_betas {
503            if seen.insert(beta.to_owned()) {
504                result.push(beta.to_owned());
505            }
506        }
507
508        result
509    }
510
511    /// Build headers with the `anthropic-beta` header set from the given betas.
512    ///
513    /// Returns `None` if there are no betas, avoiding an unnecessary header clone.
514    fn headers_with_betas(&self, betas: &[String]) -> Option<HeaderMap> {
515        if betas.is_empty() {
516            return None;
517        }
518        let mut headers = self.default_headers();
519        let value = betas.join(", ");
520        // Beta header values are ASCII, so from_str should not fail.
521        if let Ok(hv) = HeaderValue::from_str(&value) {
522            headers.insert("anthropic-beta", hv);
523        }
524        Some(headers)
525    }
526
527    /// Retry wrapper that implements exponential backoff with header-based retry-after
528    async fn retry_with_backoff<F, Fut, T>(&self, operation: F) -> Result<T>
529    where
530        F: Fn() -> Fut,
531        Fut: std::future::Future<Output = Result<T>>,
532    {
533        let backoff = ExponentialBackoff::new(self.throughput_ops_sec, self.reserve_capacity);
534        let mut last_error = None;
535
536        for attempt in 0..=self.max_retries {
537            match operation().await {
538                Ok(result) => return Ok(result),
539                Err(error) => {
540                    // Check if error is retryable
541                    if !error.is_retryable() {
542                        return Err(error);
543                    }
544
545                    // Don't sleep on the last attempt
546                    if attempt == self.max_retries {
547                        last_error = Some(error);
548                        break;
549                    }
550
551                    // Calculate backoff duration
552                    let exp_backoff_duration = backoff.next();
553
554                    // Get retry-after from error if available
555                    let header_backoff_duration = match &error {
556                        Error::RateLimit {
557                            retry_after: Some(seconds),
558                            ..
559                        } => Some(Duration::from_secs(*seconds)),
560                        Error::ServiceUnavailable {
561                            retry_after: Some(seconds),
562                            ..
563                        } => Some(Duration::from_secs(*seconds)),
564                        _ => None,
565                    };
566
567                    // Take the maximum of exponential backoff and header-based backoff
568                    let sleep_duration = match header_backoff_duration {
569                        Some(header_duration) => exp_backoff_duration.max(header_duration),
570                        None => exp_backoff_duration,
571                    };
572
573                    CLIENT_REQUEST_RETRIES.click();
574                    CLIENT_RETRY_BACKOFF.add(sleep_duration.as_secs_f64());
575                    sleep(sleep_duration).await;
576                    last_error = Some(error);
577                }
578            }
579        }
580
581        Err(last_error
582            .unwrap_or_else(|| Error::unknown("Failed after retries without capturing error")))
583    }
584
585    /// Process API response errors and convert to our Error type
586    async fn process_error_response(response: Response) -> Error {
587        let status = response.status();
588        let status_code = status.as_u16();
589
590        // Get headers we might need for error processing
591        let request_id = response
592            .headers()
593            .get("x-request-id")
594            .and_then(|val| val.to_str().ok())
595            .map(String::from);
596
597        let retry_after = response
598            .headers()
599            .get("retry-after")
600            .and_then(|val| val.to_str().ok())
601            .and_then(|val| val.parse::<u64>().ok());
602
603        // Try to parse error response body
604        #[derive(Deserialize)]
605        struct ErrorResponse {
606            error: Option<ErrorDetail>,
607        }
608
609        #[derive(Deserialize)]
610        struct ErrorDetail {
611            #[serde(rename = "type")]
612            error_type: Option<String>,
613            message: Option<String>,
614            param: Option<String>,
615        }
616
617        let error_body = match response.text().await {
618            Ok(body) => body,
619            Err(e) => {
620                return Error::http_client(
621                    format!("Failed to read error response: {e}"),
622                    Some(Box::new(e)),
623                );
624            }
625        };
626
627        // Try to parse as JSON first
628        let parsed_error = serde_json::from_str::<ErrorResponse>(&error_body).ok();
629        let error_type = parsed_error
630            .as_ref()
631            .and_then(|e| e.error.as_ref())
632            .and_then(|e| e.error_type.clone());
633        let error_message = parsed_error
634            .as_ref()
635            .and_then(|e| e.error.as_ref())
636            .and_then(|e| e.message.clone())
637            .unwrap_or_else(|| error_body.clone());
638        let error_param = parsed_error
639            .as_ref()
640            .and_then(|e| e.error.as_ref())
641            .and_then(|e| e.param.clone());
642
643        // Map HTTP status code to appropriate error type
644        match status_code {
645            400 => Error::bad_request(error_message, error_param),
646            401 => Error::authentication(error_message),
647            403 => Error::permission(error_message),
648            404 => Error::not_found(error_message, None, None),
649            408 => Error::timeout(error_message, None),
650            429 => Error::rate_limit(error_message, retry_after),
651            500 => Error::internal_server(error_message, request_id),
652            502..=504 => Error::service_unavailable(error_message, retry_after),
653            529 => Error::rate_limit(error_message, retry_after),
654            _ => Error::api(status_code, error_type, error_message, request_id),
655        }
656    }
657
658    /// Convert reqwest errors to appropriate Error types
659    fn map_request_error(&self, e: reqwest::Error) -> Error {
660        let details = format_reqwest_error(&e);
661        if e.is_timeout() {
662            Error::timeout(
663                format!("Request timed out: {details}"),
664                Some(self.timeout.as_secs_f64()),
665            )
666        } else if e.is_connect() {
667            Error::connection(format!("Connection error: {details}"), Some(Box::new(e)))
668        } else {
669            Error::http_client(format!("Request failed: {details}"), Some(Box::new(e)))
670        }
671    }
672
673    fn map_response_body_error(&self, e: reqwest::Error) -> Error {
674        let details = format_reqwest_error(&e);
675        if e.is_timeout() {
676            Error::timeout(
677                format!("Response body timed out: {details}"),
678                Some(self.timeout.as_secs_f64()),
679            )
680        } else if e.is_connect() {
681            Error::connection(
682                format!("Response body connection error: {details}"),
683                Some(Box::new(e)),
684            )
685        } else {
686            Error::http_client(
687                format!("Failed to read response body: {details}"),
688                Some(Box::new(e)),
689            )
690        }
691    }
692
693    /// Execute a POST request with error handling
694    async fn execute_post_request<T: serde::de::DeserializeOwned>(
695        &self,
696        url: &str,
697        body: &impl serde::Serialize,
698        headers: Option<HeaderMap>,
699    ) -> Result<T> {
700        let headers = headers.unwrap_or_else(|| self.default_headers());
701
702        let response = self
703            .client
704            .post(url)
705            .headers(headers)
706            .json(body)
707            .send()
708            .await
709            .map_err(|e| self.map_request_error(e))?;
710
711        if !response.status().is_success() {
712            return Err(Self::process_error_response(response).await);
713        }
714
715        let body = response
716            .bytes()
717            .await
718            .map_err(|e| self.map_response_body_error(e))?;
719
720        serde_json::from_slice::<T>(&body).map_err(|e| {
721            Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
722        })
723    }
724
725    /// Execute a GET request with error handling
726    async fn execute_get_request<T: serde::de::DeserializeOwned>(
727        &self,
728        url: &str,
729        query_params: Option<&[(String, String)]>,
730        headers: Option<HeaderMap>,
731    ) -> Result<T> {
732        let headers = headers.unwrap_or_else(|| self.default_headers());
733        let mut request = self.client.get(url).headers(headers);
734
735        if let Some(params) = query_params {
736            for (key, value) in params {
737                request = request.query(&[(key, value)]);
738            }
739        }
740
741        let response = request
742            .send()
743            .await
744            .map_err(|e| self.map_request_error(e))?;
745
746        if !response.status().is_success() {
747            return Err(Self::process_error_response(response).await);
748        }
749
750        let body = response
751            .bytes()
752            .await
753            .map_err(|e| self.map_response_body_error(e))?;
754
755        serde_json::from_slice::<T>(&body).map_err(|e| {
756            Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
757        })
758    }
759
760    /// Execute an empty-body POST request with error handling.
761    async fn execute_post_empty_request<T: serde::de::DeserializeOwned>(
762        &self,
763        url: &str,
764        headers: Option<HeaderMap>,
765    ) -> Result<T> {
766        let headers = headers.unwrap_or_else(|| self.default_headers());
767
768        let response = self
769            .client
770            .post(url)
771            .headers(headers)
772            .send()
773            .await
774            .map_err(|e| self.map_request_error(e))?;
775
776        if !response.status().is_success() {
777            return Err(Self::process_error_response(response).await);
778        }
779
780        let body = response
781            .bytes()
782            .await
783            .map_err(|e| self.map_response_body_error(e))?;
784
785        serde_json::from_slice::<T>(&body).map_err(|e| {
786            Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
787        })
788    }
789
790    /// Execute a DELETE request with error handling.
791    async fn execute_delete_request<T: serde::de::DeserializeOwned>(
792        &self,
793        url: &str,
794        headers: Option<HeaderMap>,
795    ) -> Result<T> {
796        let headers = headers.unwrap_or_else(|| self.default_headers());
797
798        let response = self
799            .client
800            .delete(url)
801            .headers(headers)
802            .send()
803            .await
804            .map_err(|e| self.map_request_error(e))?;
805
806        if !response.status().is_success() {
807            return Err(Self::process_error_response(response).await);
808        }
809
810        let body = response
811            .bytes()
812            .await
813            .map_err(|e| self.map_response_body_error(e))?;
814
815        serde_json::from_slice::<T>(&body).map_err(|e| {
816            Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
817        })
818    }
819
820    /// Execute a streaming GET request with error handling.
821    async fn execute_get_stream_request(
822        &self,
823        url: &str,
824        headers: Option<HeaderMap>,
825    ) -> Result<Response> {
826        let headers = headers.unwrap_or_else(|| self.default_headers());
827
828        let response = self
829            .client
830            .get(url)
831            .headers(headers)
832            .send()
833            .await
834            .map_err(|e| self.map_request_error(e))?;
835
836        if !response.status().is_success() {
837            return Err(Self::process_error_response(response).await);
838        }
839
840        Ok(response)
841    }
842
843    /// Send a message to the API and get a non-streaming response.
844    pub async fn send(&self, mut params: MessageCreateParams) -> Result<Message> {
845        let start = Instant::now();
846        CLIENT_REQUESTS.click();
847
848        // Validate parameters first
849        if let Err(err) = params.validate() {
850            CLIENT_REQUEST_ERRORS.click();
851            CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
852            return Err(err);
853        }
854
855        // Ensure stream is disabled
856        params.stream = false;
857
858        // Collect all betas: client defaults + per-request + auto-detected
859        let auto_betas: Vec<&str> = if params.requires_structured_outputs_beta() {
860            vec![STRUCTURED_OUTPUTS_BETA]
861        } else {
862            vec![]
863        };
864        let all_betas = self.collect_betas(params.betas.as_deref(), &auto_betas);
865        let headers = self.headers_with_betas(&all_betas);
866
867        let result = self
868            .retry_with_backoff(|| async {
869                let url = self.build_url("messages");
870                self.execute_post_request(&url, &params, headers.clone())
871                    .await
872            })
873            .await;
874
875        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
876        if result.is_err() {
877            CLIENT_REQUEST_ERRORS.click();
878        }
879        result
880    }
881
882    /// Send a message to the API with logging and get a non-streaming response.
883    ///
884    /// This method is identical to [`send`](Self::send) but additionally logs
885    /// the response through the provided [`ClientLogger`].
886    pub async fn send_with_logger(
887        &self,
888        params: MessageCreateParams,
889        logger: &dyn ClientLogger,
890    ) -> Result<Message> {
891        let result = self.send(params).await;
892        if let Ok(ref message) = result {
893            logger.log_response(message);
894        }
895        result
896    }
897
898    /// Send a message to the API and get a streaming response.
899    ///
900    /// Returns a stream of MessageStreamEvent objects that can be processed incrementally.
901    pub async fn stream(
902        &self,
903        params: &MessageCreateParams,
904    ) -> Result<impl Stream<Item = Result<MessageStreamEvent>> + use<>> {
905        let start = Instant::now();
906        CLIENT_REQUESTS.click();
907
908        // Validate parameters first
909        if let Err(err) = params.validate() {
910            CLIENT_REQUEST_ERRORS.click();
911            CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
912            return Err(err);
913        }
914
915        // Ensure stream is enabled
916        if !params.stream {
917            let err = Error::validation(
918                "stream must be true for streaming requests",
919                Some("stream".to_string()),
920            );
921            CLIENT_REQUEST_ERRORS.click();
922            CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
923            return Err(err);
924        }
925
926        // Collect all betas: client defaults + per-request + auto-detected
927        let auto_betas: Vec<&str> = if params.requires_structured_outputs_beta() {
928            vec![STRUCTURED_OUTPUTS_BETA]
929        } else {
930            vec![]
931        };
932        let all_betas = self.collect_betas(params.betas.as_deref(), &auto_betas);
933
934        let response = self
935            .retry_with_backoff(|| async {
936                let url = self.build_url("messages");
937                debug_stream_request(&url, params);
938
939                let mut headers = self
940                    .headers_with_betas(&all_betas)
941                    .unwrap_or_else(|| self.default_headers());
942                headers.insert(
943                    header::ACCEPT,
944                    HeaderValue::from_static("text/event-stream"),
945                );
946
947                let response = self
948                    .client
949                    .post(&url)
950                    .headers(headers)
951                    .json(params)
952                    .send()
953                    .await
954                    .map_err(|e| self.map_request_error(e))?;
955
956                if !response.status().is_success() {
957                    return Err(Self::process_error_response(response).await);
958                }
959
960                Ok(response)
961            })
962            .await;
963
964        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
965        let response = match response {
966            Ok(response) => response,
967            Err(err) => {
968                CLIENT_REQUEST_ERRORS.click();
969                return Err(err);
970            }
971        };
972
973        // Get the byte stream from the response
974        let stream = response.bytes_stream();
975
976        // Create an SSE processor
977        Ok(process_message_stream_sse(stream))
978    }
979
980    /// Send a message to the API with logging and get a streaming response.
981    ///
982    /// This method is identical to [`stream`](Self::stream) but additionally logs
983    /// each streaming event and the final reconstructed message through the
984    /// provided [`ClientLogger`].
985    ///
986    /// Returns a [`LoggingStream`] that wraps an [`AccumulatingStream`], logging
987    /// each event as it passes through and logging the final message when the
988    /// stream completes.
989    pub async fn stream_with_logger<'a>(
990        &self,
991        params: &MessageCreateParams,
992        logger: &'a dyn ClientLogger,
993    ) -> Result<LoggingStream<'a>> {
994        let raw_stream = self.stream(params).await?;
995        let (accumulating_stream, receiver) = AccumulatingStream::new(raw_stream);
996        Ok(LoggingStream::new(accumulating_stream, receiver, logger))
997    }
998
999    /// Count tokens for a message.
1000    ///
1001    /// This method counts the number of tokens that would be used by a message with the given parameters.
1002    /// It's useful for estimating costs or making sure your messages fit within the model's context window.
1003    pub async fn count_tokens(
1004        &self,
1005        params: MessageCountTokensParams,
1006    ) -> Result<MessageTokensCount> {
1007        let start = Instant::now();
1008        CLIENT_REQUESTS.click();
1009
1010        // Collect betas: client defaults + per-request
1011        let all_betas = self.collect_betas(params.betas.as_deref(), &[]);
1012        let headers = self.headers_with_betas(&all_betas);
1013
1014        let result = self
1015            .retry_with_backoff(|| async {
1016                let url = self.build_url("messages/count_tokens");
1017                self.execute_post_request(&url, &params, headers.clone())
1018                    .await
1019            })
1020            .await;
1021
1022        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1023        if result.is_err() {
1024            CLIENT_REQUEST_ERRORS.click();
1025        }
1026        result
1027    }
1028
1029    /// Create a Message Batch for asynchronous processing.
1030    pub async fn create_message_batch(
1031        &self,
1032        params: MessageBatchCreateParams,
1033    ) -> Result<MessageBatch> {
1034        let start = Instant::now();
1035        CLIENT_REQUESTS.click();
1036
1037        if let Err(err) = params.validate() {
1038            CLIENT_REQUEST_ERRORS.click();
1039            CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1040            return Err(err);
1041        }
1042
1043        let mut request_betas = Vec::new();
1044        if let Some(betas) = &params.betas {
1045            request_betas.extend(betas.iter().cloned());
1046        }
1047        for request in &params.requests {
1048            if let Some(betas) = &request.params.betas {
1049                request_betas.extend(betas.iter().cloned());
1050            }
1051        }
1052
1053        let auto_betas: Vec<&str> = if params
1054            .requests
1055            .iter()
1056            .any(|request| request.params.requires_structured_outputs_beta())
1057        {
1058            vec![STRUCTURED_OUTPUTS_BETA]
1059        } else {
1060            vec![]
1061        };
1062        let all_betas = if request_betas.is_empty() {
1063            self.collect_betas(None, &auto_betas)
1064        } else {
1065            self.collect_betas(Some(&request_betas), &auto_betas)
1066        };
1067        let headers = self.headers_with_betas(&all_betas);
1068
1069        let result = self
1070            .retry_with_backoff(|| async {
1071                let url = self.build_url("messages/batches");
1072                self.execute_post_request(&url, &params, headers.clone())
1073                    .await
1074            })
1075            .await;
1076
1077        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1078        if result.is_err() {
1079            CLIENT_REQUEST_ERRORS.click();
1080        }
1081        result
1082    }
1083
1084    /// Retrieve a Message Batch by ID.
1085    pub async fn get_message_batch(&self, message_batch_id: &str) -> Result<MessageBatch> {
1086        let start = Instant::now();
1087        CLIENT_REQUESTS.click();
1088
1089        let all_betas = self.collect_betas(None, &[]);
1090        let headers = self.headers_with_betas(&all_betas);
1091
1092        let result = self
1093            .retry_with_backoff(|| async {
1094                let url = self.build_url(&format!("messages/batches/{message_batch_id}"));
1095                self.execute_get_request(&url, None, headers.clone()).await
1096            })
1097            .await;
1098
1099        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1100        if result.is_err() {
1101            CLIENT_REQUEST_ERRORS.click();
1102        }
1103        result
1104    }
1105
1106    /// Retrieve a Message Batch by ID.
1107    pub async fn retrieve_message_batch(&self, message_batch_id: &str) -> Result<MessageBatch> {
1108        self.get_message_batch(message_batch_id).await
1109    }
1110
1111    /// List Message Batches in the current Workspace.
1112    pub async fn list_message_batches(
1113        &self,
1114        params: Option<MessageBatchListParams>,
1115    ) -> Result<MessageBatchListResponse> {
1116        let start = Instant::now();
1117        CLIENT_REQUESTS.click();
1118
1119        let request_betas = params.as_ref().and_then(|p| p.betas.as_deref());
1120        let all_betas = self.collect_betas(request_betas, &[]);
1121        let headers = self.headers_with_betas(&all_betas);
1122
1123        let result = self
1124            .retry_with_backoff(|| async {
1125                let url = self.build_url("messages/batches");
1126                let query_params = params.as_ref().map(|p| {
1127                    let mut params = Vec::new();
1128                    if let Some(ref after_id) = p.after_id {
1129                        params.push(("after_id".to_string(), after_id.clone()));
1130                    }
1131                    if let Some(ref before_id) = p.before_id {
1132                        params.push(("before_id".to_string(), before_id.clone()));
1133                    }
1134                    if let Some(limit) = p.limit {
1135                        params.push(("limit".to_string(), limit.to_string()));
1136                    }
1137                    params
1138                });
1139
1140                self.execute_get_request(&url, query_params.as_deref(), headers.clone())
1141                    .await
1142            })
1143            .await;
1144
1145        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1146        if result.is_err() {
1147            CLIENT_REQUEST_ERRORS.click();
1148        }
1149        result
1150    }
1151
1152    /// Cancel a Message Batch that is currently processing.
1153    pub async fn cancel_message_batch(&self, message_batch_id: &str) -> Result<MessageBatch> {
1154        let start = Instant::now();
1155        CLIENT_REQUESTS.click();
1156
1157        let all_betas = self.collect_betas(None, &[]);
1158        let headers = self.headers_with_betas(&all_betas);
1159
1160        let result = self
1161            .retry_with_backoff(|| async {
1162                let url = self.build_url(&format!("messages/batches/{message_batch_id}/cancel"));
1163                self.execute_post_empty_request(&url, headers.clone()).await
1164            })
1165            .await;
1166
1167        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1168        if result.is_err() {
1169            CLIENT_REQUEST_ERRORS.click();
1170        }
1171        result
1172    }
1173
1174    /// Delete a Message Batch after processing has ended.
1175    pub async fn delete_message_batch(
1176        &self,
1177        message_batch_id: &str,
1178    ) -> Result<DeletedMessageBatch> {
1179        let start = Instant::now();
1180        CLIENT_REQUESTS.click();
1181
1182        let all_betas = self.collect_betas(None, &[]);
1183        let headers = self.headers_with_betas(&all_betas);
1184
1185        let result = self
1186            .retry_with_backoff(|| async {
1187                let url = self.build_url(&format!("messages/batches/{message_batch_id}"));
1188                self.execute_delete_request(&url, headers.clone()).await
1189            })
1190            .await;
1191
1192        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1193        if result.is_err() {
1194            CLIENT_REQUEST_ERRORS.click();
1195        }
1196        result
1197    }
1198
1199    /// Stream results for an ended Message Batch as JSONL records.
1200    pub async fn stream_message_batch_results(
1201        &self,
1202        message_batch_id: &str,
1203    ) -> Result<impl Stream<Item = Result<MessageBatchResult>> + use<>> {
1204        let start = Instant::now();
1205        CLIENT_REQUESTS.click();
1206
1207        let all_betas = self.collect_betas(None, &[]);
1208        let headers = self.headers_with_betas(&all_betas);
1209
1210        let response = self
1211            .retry_with_backoff(|| async {
1212                let url = self.build_url(&format!("messages/batches/{message_batch_id}/results"));
1213                self.execute_get_stream_request(&url, headers.clone()).await
1214            })
1215            .await;
1216
1217        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1218        let response = match response {
1219            Ok(response) => response,
1220            Err(err) => {
1221                CLIENT_REQUEST_ERRORS.click();
1222                return Err(err);
1223            }
1224        };
1225
1226        Ok(process_message_batch_result_jsonl(response.bytes_stream()))
1227    }
1228
1229    /// List available models from the API.
1230    ///
1231    /// Returns a paginated list of all available models. Use the parameters to control
1232    /// pagination and filter results.
1233    pub async fn list_models(&self, params: Option<ModelListParams>) -> Result<ModelListResponse> {
1234        let start = Instant::now();
1235        CLIENT_REQUESTS.click();
1236
1237        // Collect betas: client defaults + per-request from ModelListParams
1238        let request_betas = params.as_ref().and_then(|p| p.betas.as_deref());
1239        let all_betas = self.collect_betas(request_betas, &[]);
1240        let headers = self.headers_with_betas(&all_betas);
1241
1242        let result = self
1243            .retry_with_backoff(|| async {
1244                let url = self.build_url("models");
1245
1246                let query_params = params.as_ref().map(|p| {
1247                    let mut params = Vec::new();
1248                    if let Some(ref after_id) = p.after_id {
1249                        params.push(("after_id".to_string(), after_id.clone()));
1250                    }
1251                    if let Some(ref before_id) = p.before_id {
1252                        params.push(("before_id".to_string(), before_id.clone()));
1253                    }
1254                    if let Some(limit) = p.limit {
1255                        params.push(("limit".to_string(), limit.to_string()));
1256                    }
1257                    params
1258                });
1259
1260                self.execute_get_request(&url, query_params.as_deref(), headers.clone())
1261                    .await
1262            })
1263            .await;
1264
1265        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1266        if result.is_err() {
1267            CLIENT_REQUEST_ERRORS.click();
1268        }
1269        result
1270    }
1271
1272    /// Retrieve information about a specific model.
1273    ///
1274    /// Returns detailed information about the specified model, including its
1275    /// ID, creation date, display name, and type.
1276    pub async fn get_model(&self, model_id: &str) -> Result<ModelInfo> {
1277        let start = Instant::now();
1278        CLIENT_REQUESTS.click();
1279        let result = self
1280            .retry_with_backoff(|| async {
1281                let url = self.build_url(&format!("models/{}", model_id));
1282                self.execute_get_request(&url, None, None).await
1283            })
1284            .await;
1285
1286        CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1287        if result.is_err() {
1288            CLIENT_REQUEST_ERRORS.click();
1289        }
1290        result
1291    }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296    use super::*;
1297    use crate::{
1298        KnownModel, MessageBatchCreateParams, MessageBatchCreateRequest, MessageBatchListParams,
1299        MessageBatchResultVariant, MessageParam, Model,
1300    };
1301    use futures::StreamExt;
1302    use serde_json::Value;
1303    use std::sync::Arc;
1304    use std::sync::atomic::{AtomicUsize, Ordering};
1305    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1306    use tokio::net::TcpListener;
1307
1308    fn request_headers_end(buffer: &[u8]) -> Option<usize> {
1309        buffer.windows(4).position(|window| window == b"\r\n\r\n")
1310    }
1311
1312    async fn read_http_request_bytes(socket: &mut tokio::net::TcpStream) -> Vec<u8> {
1313        let mut buffer = Vec::new();
1314        let mut chunk = [0_u8; 1024];
1315        loop {
1316            let read = socket.read(&mut chunk).await.unwrap();
1317            assert!(
1318                read > 0,
1319                "client closed the connection before sending headers"
1320            );
1321            buffer.extend_from_slice(&chunk[..read]);
1322            if request_headers_end(&buffer).is_some() {
1323                break;
1324            }
1325        }
1326
1327        let headers_end = request_headers_end(&buffer).unwrap();
1328        let headers = String::from_utf8_lossy(&buffer[..headers_end]);
1329        let content_length = headers
1330            .lines()
1331            .find_map(|line| {
1332                let mut parts = line.splitn(2, ':');
1333                let name = parts.next()?.trim();
1334                let value = parts.next()?.trim();
1335                name.eq_ignore_ascii_case("content-length")
1336                    .then(|| value.parse::<usize>().ok())
1337                    .flatten()
1338            })
1339            .unwrap_or(0);
1340
1341        while buffer.len() - (headers_end + 4) < content_length {
1342            let read = socket.read(&mut chunk).await.unwrap();
1343            assert!(
1344                read > 0,
1345                "client closed the connection before sending the full body"
1346            );
1347            buffer.extend_from_slice(&chunk[..read]);
1348        }
1349        buffer
1350    }
1351
1352    async fn read_http_request(socket: &mut tokio::net::TcpStream) {
1353        read_http_request_bytes(socket).await;
1354    }
1355
1356    fn split_http_request(request: &[u8]) -> (String, String) {
1357        let headers_end = request_headers_end(request).unwrap();
1358        let headers = String::from_utf8_lossy(&request[..headers_end]).to_string();
1359        let body = String::from_utf8_lossy(&request[headers_end + 4..]).to_string();
1360        (headers, body)
1361    }
1362
1363    fn request_target(headers: &str) -> &str {
1364        headers
1365            .lines()
1366            .next()
1367            .and_then(|line| line.split_whitespace().nth(1))
1368            .unwrap()
1369    }
1370
1371    fn request_method(headers: &str) -> &str {
1372        headers
1373            .lines()
1374            .next()
1375            .and_then(|line| line.split_whitespace().next())
1376            .unwrap()
1377    }
1378
1379    fn header_value<'a>(headers: &'a str, name: &str) -> Option<&'a str> {
1380        headers.lines().find_map(|line| {
1381            let mut parts = line.splitn(2, ':');
1382            let header_name = parts.next()?.trim();
1383            let value = parts.next()?.trim();
1384            header_name.eq_ignore_ascii_case(name).then_some(value)
1385        })
1386    }
1387
1388    async fn write_json_response(socket: &mut tokio::net::TcpStream, body: &str) {
1389        let response = format!(
1390            "HTTP/1.1 200 OK\r\n\
1391Content-Type: application/json\r\n\
1392Content-Length: {}\r\n\
1393Connection: close\r\n\r\n{}",
1394            body.len(),
1395            body
1396        );
1397        socket.write_all(response.as_bytes()).await.unwrap();
1398        socket.shutdown().await.unwrap();
1399    }
1400
1401    async fn start_test_server<F, Fut>(handler: F) -> String
1402    where
1403        F: FnOnce(tokio::net::TcpStream) -> Fut + Send + 'static,
1404        Fut: std::future::Future<Output = ()> + Send + 'static,
1405    {
1406        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1407        let address = listener.local_addr().unwrap();
1408        tokio::spawn(async move {
1409            let (socket, _) = listener.accept().await.unwrap();
1410            socket.set_nodelay(true).unwrap();
1411            handler(socket).await;
1412        });
1413        format!("http://{}", address)
1414    }
1415
1416    #[tokio::test]
1417    async fn retry_logic_with_backoff() {
1418        let client = Anthropic {
1419            api_key: "test".to_string(),
1420            client: ReqwestClient::new(),
1421            base_url: "http://localhost".to_string(),
1422            timeout: Duration::from_secs(1),
1423            max_retries: 2,
1424            throughput_ops_sec: 1.0 / 60.0,
1425            reserve_capacity: 1.0 / 60.0,
1426            cached_headers: Arc::new(HeaderMap::new()),
1427            default_betas: Vec::new(),
1428        };
1429
1430        let attempt_counter = Arc::new(AtomicUsize::new(0));
1431        let counter_clone = attempt_counter.clone();
1432
1433        let result = client
1434            .retry_with_backoff(|| {
1435                let counter = counter_clone.clone();
1436                async move {
1437                    let attempt = counter.fetch_add(1, Ordering::SeqCst);
1438                    match attempt {
1439                        0 | 1 => Err(Error::rate_limit("Rate limited", Some(1))),
1440                        _ => Ok("success".to_string()),
1441                    }
1442                }
1443            })
1444            .await;
1445
1446        assert!(result.is_ok());
1447        assert_eq!(result.unwrap(), "success");
1448        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1449    }
1450
1451    #[tokio::test]
1452    async fn retry_logic_with_non_retryable_error() {
1453        let client = Anthropic {
1454            api_key: "test".to_string(),
1455            client: ReqwestClient::new(),
1456            base_url: "http://localhost".to_string(),
1457            timeout: Duration::from_secs(1),
1458            max_retries: 2,
1459            throughput_ops_sec: 1.0 / 60.0,
1460            reserve_capacity: 1.0 / 60.0,
1461            cached_headers: Arc::new(HeaderMap::new()),
1462            default_betas: Vec::new(),
1463        };
1464
1465        let attempt_counter = Arc::new(AtomicUsize::new(0));
1466        let counter_clone = attempt_counter.clone();
1467
1468        let result: Result<String> = client
1469            .retry_with_backoff(|| {
1470                let counter = counter_clone.clone();
1471                async move {
1472                    counter.fetch_add(1, Ordering::SeqCst);
1473                    Err(Error::authentication("Invalid API key"))
1474                }
1475            })
1476            .await;
1477
1478        assert!(result.is_err());
1479        assert!(result.unwrap_err().is_authentication());
1480        // Should only attempt once since authentication errors are not retryable
1481        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1482    }
1483
1484    #[tokio::test]
1485    async fn retry_logic_max_retries_exceeded() {
1486        let client = Anthropic {
1487            api_key: "test".to_string(),
1488            client: ReqwestClient::new(),
1489            base_url: "http://localhost".to_string(),
1490            timeout: Duration::from_secs(1),
1491            max_retries: 2,
1492            throughput_ops_sec: 1.0 / 60.0,
1493            reserve_capacity: 1.0 / 60.0,
1494            cached_headers: Arc::new(HeaderMap::new()),
1495            default_betas: Vec::new(),
1496        };
1497
1498        let attempt_counter = Arc::new(AtomicUsize::new(0));
1499        let counter_clone = attempt_counter.clone();
1500
1501        let result: Result<String> = client
1502            .retry_with_backoff(|| {
1503                let counter = counter_clone.clone();
1504                async move {
1505                    counter.fetch_add(1, Ordering::SeqCst);
1506                    Err(Error::rate_limit("Always rate limited", Some(1)))
1507                }
1508            })
1509            .await;
1510
1511        assert!(result.is_err());
1512        assert!(result.unwrap_err().is_rate_limit());
1513        // Should attempt max_retries + 1 times (3 total: initial + 2 retries)
1514        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1515    }
1516
1517    #[tokio::test]
1518    async fn error_529_is_retryable() {
1519        // Test that 529 errors are properly mapped to rate_limit and are retryable
1520        let client = Anthropic {
1521            api_key: "test".to_string(),
1522            client: ReqwestClient::new(),
1523            base_url: "http://localhost".to_string(),
1524            timeout: Duration::from_secs(1),
1525            max_retries: 2,
1526            throughput_ops_sec: 1.0 / 60.0,
1527            reserve_capacity: 1.0 / 60.0,
1528            cached_headers: Arc::new(HeaderMap::new()),
1529            default_betas: Vec::new(),
1530        };
1531
1532        let attempt_counter = Arc::new(AtomicUsize::new(0));
1533        let counter_clone = attempt_counter.clone();
1534
1535        let result = client
1536            .retry_with_backoff(|| {
1537                let counter = counter_clone.clone();
1538                async move {
1539                    let attempt = counter.fetch_add(1, Ordering::SeqCst);
1540                    match attempt {
1541                        0 | 1 => {
1542                            // Simulate a 529 overloaded error
1543                            Err(Error::api(
1544                                529,
1545                                Some("overloaded_error".to_string()),
1546                                "Overloaded".to_string(),
1547                                None,
1548                            ))
1549                        }
1550                        _ => Ok("success".to_string()),
1551                    }
1552                }
1553            })
1554            .await;
1555
1556        assert!(result.is_ok());
1557        assert_eq!(result.unwrap(), "success");
1558        // Should retry: initial attempt + 2 retries = 3 total
1559        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1560    }
1561
1562    #[test]
1563    fn error_529_mapped_correctly() {
1564        // Test that a 529 API error is correctly identified as retryable
1565        let error = Error::api(
1566            529,
1567            Some("overloaded_error".to_string()),
1568            "Overloaded".to_string(),
1569            None,
1570        );
1571        assert!(error.is_retryable());
1572
1573        // Test that rate_limit error (which 529 now maps to) is also retryable
1574        let rate_limit_error = Error::rate_limit("Overloaded", Some(5));
1575        assert!(rate_limit_error.is_retryable());
1576    }
1577
1578    #[test]
1579    fn resolve_api_key_regular_value() {
1580        let result = Anthropic::resolve_api_key("sk-test-key-123");
1581        assert!(result.is_ok());
1582        assert_eq!(result.unwrap(), "sk-test-key-123");
1583    }
1584
1585    #[test]
1586    fn resolve_api_key_file_url_absolute() {
1587        let test_dir = std::env::temp_dir().join(format!("claudius_test_{}", std::process::id()));
1588        std::fs::create_dir_all(&test_dir).unwrap();
1589        let test_file = test_dir.join("test_api_key.txt");
1590        std::fs::write(&test_file, "sk-test-from-file-123\n").unwrap();
1591
1592        let file_url = format!("file://{}", test_file.display());
1593        let result = Anthropic::resolve_api_key(&file_url);
1594
1595        std::fs::remove_dir_all(&test_dir).unwrap();
1596
1597        assert!(result.is_ok());
1598        assert_eq!(result.unwrap(), "sk-test-from-file-123");
1599    }
1600
1601    #[test]
1602    fn resolve_api_key_file_url_relative() {
1603        let test_file = "test_relative_key.txt";
1604        std::fs::write(test_file, "sk-relative-key-456\n").unwrap();
1605
1606        let file_url = format!("file://{}", test_file);
1607        let result = Anthropic::resolve_api_key(&file_url);
1608
1609        std::fs::remove_file(test_file).unwrap();
1610
1611        assert!(result.is_ok());
1612        assert_eq!(result.unwrap(), "sk-relative-key-456");
1613    }
1614
1615    #[test]
1616    fn resolve_api_key_file_url_nonexistent() {
1617        let result = Anthropic::resolve_api_key("file:///nonexistent/path/to/key.txt");
1618        assert!(result.is_err());
1619
1620        let error = result.unwrap_err();
1621        assert!(error.is_validation());
1622        assert!(format!("{}", error).contains("Failed to read API key from file"));
1623    }
1624
1625    #[test]
1626    fn resolve_api_key_file_url_with_whitespace() {
1627        let test_file = "test_whitespace_key.txt";
1628        std::fs::write(test_file, "  sk-whitespace-key-789  \n  ").unwrap();
1629
1630        let file_url = format!("file://{}", test_file);
1631        let result = Anthropic::resolve_api_key(&file_url);
1632
1633        std::fs::remove_file(test_file).unwrap();
1634
1635        assert!(result.is_ok());
1636        assert_eq!(result.unwrap(), "sk-whitespace-key-789");
1637    }
1638
1639    #[test]
1640    fn client_builder_methods() {
1641        let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1642
1643        // Test builder pattern methods
1644        let configured_client = client
1645            .with_base_url("https://custom.api.com".to_string())
1646            .with_max_retries(5)
1647            .with_backoff_params(2.0, 1.0);
1648
1649        assert_eq!(configured_client.base_url, "https://custom.api.com");
1650        assert_eq!(configured_client.max_retries, 5);
1651        assert_eq!(configured_client.throughput_ops_sec, 2.0);
1652        assert_eq!(configured_client.reserve_capacity, 1.0);
1653    }
1654
1655    #[test]
1656    fn build_url_default_base() {
1657        let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1658        // Default base URL: https://api.anthropic.com
1659        assert_eq!(
1660            client.build_url("messages"),
1661            "https://api.anthropic.com/v1/messages"
1662        );
1663        assert_eq!(
1664            client.build_url("messages/count_tokens"),
1665            "https://api.anthropic.com/v1/messages/count_tokens"
1666        );
1667        assert_eq!(
1668            client.build_url("models"),
1669            "https://api.anthropic.com/v1/models"
1670        );
1671    }
1672
1673    #[test]
1674    fn build_url_custom_base_without_trailing_slash() {
1675        let client = Anthropic::new(Some("test_key".to_string()))
1676            .unwrap()
1677            .with_base_url("https://api.minimax.io/anthropic".to_string());
1678        assert_eq!(
1679            client.build_url("messages"),
1680            "https://api.minimax.io/anthropic/v1/messages"
1681        );
1682    }
1683
1684    #[test]
1685    fn build_url_custom_base_with_trailing_slash() {
1686        let client = Anthropic::new(Some("test_key".to_string()))
1687            .unwrap()
1688            .with_base_url("https://api.minimax.io/anthropic/".to_string());
1689        assert_eq!(
1690            client.build_url("messages"),
1691            "https://api.minimax.io/anthropic/v1/messages"
1692        );
1693    }
1694
1695    #[test]
1696    fn build_url_minimax_china() {
1697        let client = Anthropic::new(Some("test_key".to_string()))
1698            .unwrap()
1699            .with_base_url("https://api.minimaxi.com/anthropic".to_string());
1700        assert_eq!(
1701            client.build_url("messages"),
1702            "https://api.minimaxi.com/anthropic/v1/messages"
1703        );
1704        assert_eq!(
1705            client.build_url(&format!("models/{}", "claude-3-opus")),
1706            "https://api.minimaxi.com/anthropic/v1/models/claude-3-opus"
1707        );
1708    }
1709
1710    #[test]
1711    fn client_timeout_configuration() {
1712        let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1713        let timeout = Duration::from_secs(30);
1714
1715        let configured_client = client.with_timeout(timeout).unwrap();
1716        assert_eq!(configured_client.timeout, timeout);
1717    }
1718
1719    #[test]
1720    fn client_cached_headers_performance() {
1721        let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1722
1723        // Test that headers are cached and cloning is cheap
1724        let headers1 = client.default_headers();
1725        let headers2 = client.default_headers();
1726
1727        assert_eq!(headers1.len(), headers2.len());
1728        assert!(headers1.contains_key("x-api-key"));
1729        assert!(headers1.contains_key("anthropic-version"));
1730        assert!(headers1.contains_key("content-type"));
1731    }
1732
1733    #[test]
1734    fn request_error_mapping() {
1735        let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1736
1737        // Test different types of reqwest errors are mapped correctly
1738        // Note: These are unit tests for the mapping logic, not integration tests
1739        let _timeout = Duration::from_secs(30);
1740        assert_eq!(client.timeout, DEFAULT_TIMEOUT); // Should use default initially
1741    }
1742
1743    #[tokio::test]
1744    async fn concurrent_retry_safety() {
1745        use std::sync::atomic::{AtomicUsize, Ordering};
1746        use tokio::spawn;
1747
1748        let client = Anthropic {
1749            api_key: "test".to_string(),
1750            client: ReqwestClient::new(),
1751            base_url: "http://localhost".to_string(),
1752            timeout: Duration::from_secs(1),
1753            max_retries: 1,
1754            throughput_ops_sec: 1.0,
1755            reserve_capacity: 1.0,
1756            cached_headers: Arc::new(HeaderMap::new()),
1757            default_betas: Vec::new(),
1758        };
1759
1760        let attempt_counter = Arc::new(AtomicUsize::new(0));
1761        let mut handles = vec![];
1762
1763        // Spawn multiple concurrent retry operations
1764        for _ in 0..3 {
1765            let client_clone = client.clone();
1766            let counter_clone = attempt_counter.clone();
1767
1768            let handle = spawn(async move {
1769                client_clone
1770                    .retry_with_backoff(|| {
1771                        let counter = counter_clone.clone();
1772                        async move {
1773                            counter.fetch_add(1, Ordering::SeqCst);
1774                            Ok::<String, Error>("success".to_string())
1775                        }
1776                    })
1777                    .await
1778            });
1779            handles.push(handle);
1780        }
1781
1782        // Wait for all operations to complete
1783        for handle in handles {
1784            let result = handle.await.unwrap();
1785            assert!(result.is_ok());
1786        }
1787
1788        // Verify all operations executed
1789        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1790    }
1791
1792    fn test_client() -> Anthropic {
1793        Anthropic {
1794            api_key: "test".to_string(),
1795            client: ReqwestClient::new(),
1796            base_url: "http://localhost".to_string(),
1797            timeout: Duration::from_secs(1),
1798            max_retries: 0,
1799            throughput_ops_sec: 1.0 / 60.0,
1800            reserve_capacity: 1.0 / 60.0,
1801            cached_headers: Arc::new(HeaderMap::new()),
1802            default_betas: Vec::new(),
1803        }
1804    }
1805
1806    #[test]
1807    fn collect_betas_empty() {
1808        let client = test_client();
1809        let result = client.collect_betas(None, &[]);
1810        assert!(result.is_empty());
1811    }
1812
1813    #[test]
1814    fn collect_betas_client_defaults_only() {
1815        let client = test_client().with_default_betas(["alpha", "bravo"]);
1816        let result = client.collect_betas(None, &[]);
1817        assert_eq!(result, vec!["alpha", "bravo"]);
1818    }
1819
1820    #[test]
1821    fn collect_betas_request_only() {
1822        let client = test_client();
1823        let request = vec!["compact-2026-01-12".to_string()];
1824        let result = client.collect_betas(Some(&request), &[]);
1825        assert_eq!(result, vec!["compact-2026-01-12"]);
1826    }
1827
1828    #[test]
1829    fn collect_betas_auto_only() {
1830        let client = test_client();
1831        let result = client.collect_betas(None, &[STRUCTURED_OUTPUTS_BETA]);
1832        assert_eq!(result, vec![STRUCTURED_OUTPUTS_BETA]);
1833    }
1834
1835    #[test]
1836    fn collect_betas_merges_all_sources() {
1837        let client = test_client().with_default_betas(["default-beta"]);
1838        let request = vec!["request-beta".to_string()];
1839        let result = client.collect_betas(Some(&request), &["auto-beta"]);
1840        assert_eq!(result, vec!["default-beta", "request-beta", "auto-beta"]);
1841    }
1842
1843    #[test]
1844    fn collect_betas_deduplicates() {
1845        let client = test_client().with_default_betas(["shared-beta", "default-only"]);
1846        let request = vec!["shared-beta".to_string(), "request-only".to_string()];
1847        let result = client.collect_betas(Some(&request), &["shared-beta"]);
1848        assert_eq!(result, vec!["shared-beta", "default-only", "request-only"]);
1849    }
1850
1851    #[test]
1852    fn headers_with_betas_none_when_empty() {
1853        let client = test_client();
1854        assert!(client.headers_with_betas(&[]).is_none());
1855    }
1856
1857    #[test]
1858    fn headers_with_betas_joins_with_comma() {
1859        let client = test_client();
1860        let betas = vec!["alpha".to_string(), "bravo".to_string()];
1861        let headers = client.headers_with_betas(&betas).unwrap();
1862        assert_eq!(
1863            headers.get("anthropic-beta").unwrap().to_str().unwrap(),
1864            "alpha, bravo"
1865        );
1866    }
1867
1868    #[test]
1869    fn with_default_betas_builder() {
1870        let client = test_client().with_default_betas(["a", "b", "c"]);
1871        assert_eq!(client.default_betas, vec!["a", "b", "c"]);
1872    }
1873
1874    #[tokio::test]
1875    async fn create_message_batch_posts_expected_body_and_betas() {
1876        let response_body = r#"{
1877            "id": "msgbatch_123",
1878            "type": "message_batch",
1879            "processing_status": "in_progress",
1880            "request_counts": {
1881                "processing": 1,
1882                "succeeded": 0,
1883                "errored": 0,
1884                "canceled": 0,
1885                "expired": 0
1886            },
1887            "ended_at": null,
1888            "created_at": "2024-09-24T18:37:24Z",
1889            "expires_at": "2024-09-25T18:37:24Z",
1890            "cancel_initiated_at": null,
1891            "results_url": null
1892        }"#;
1893
1894        let base_url = start_test_server(move |mut socket| async move {
1895            let request = read_http_request_bytes(&mut socket).await;
1896            let (headers, body) = split_http_request(&request);
1897            assert_eq!(request_method(&headers), "POST");
1898            assert_eq!(request_target(&headers), "/v1/messages/batches");
1899            assert_eq!(
1900                header_value(&headers, "anthropic-beta"),
1901                Some("default-beta, batch-beta, request-beta")
1902            );
1903
1904            let body_json: Value = serde_json::from_str(&body).unwrap();
1905            assert_eq!(body_json["requests"][0]["custom_id"], "request-1");
1906            assert_eq!(
1907                body_json["requests"][0]["params"]["model"],
1908                "claude-haiku-4-5"
1909            );
1910            assert!(
1911                body_json["requests"][0]["params"].get("stream").is_none(),
1912                "stream must not be serialized inside batch params"
1913            );
1914            assert!(body_json.get("betas").is_none());
1915
1916            write_json_response(&mut socket, response_body).await;
1917        })
1918        .await;
1919
1920        let client = Anthropic::new(Some("test_key".to_string()))
1921            .unwrap()
1922            .with_base_url(base_url)
1923            .with_default_betas(["default-beta"]);
1924        let request_params = MessageCreateParams::new(
1925            16,
1926            vec![MessageParam::user("ping")],
1927            Model::Known(KnownModel::ClaudeHaiku45),
1928        )
1929        .with_beta("request-beta");
1930        let params = MessageBatchCreateParams::new(vec![MessageBatchCreateRequest::new(
1931            "request-1",
1932            request_params,
1933        )])
1934        .with_beta("batch-beta");
1935
1936        let batch = client.create_message_batch(params).await.unwrap();
1937        assert_eq!(batch.id, "msgbatch_123");
1938    }
1939
1940    #[tokio::test]
1941    async fn get_message_batch_uses_retrieve_endpoint() {
1942        let response_body = r#"{
1943            "id": "msgbatch_123",
1944            "type": "message_batch",
1945            "processing_status": "ended",
1946            "request_counts": {
1947                "processing": 0,
1948                "succeeded": 1,
1949                "errored": 0,
1950                "canceled": 0,
1951                "expired": 0
1952            },
1953            "ended_at": "2024-09-24T18:39:24Z",
1954            "created_at": "2024-09-24T18:37:24Z",
1955            "expires_at": "2024-09-25T18:37:24Z",
1956            "cancel_initiated_at": null,
1957            "results_url": "https://api.anthropic.com/results"
1958        }"#;
1959
1960        let base_url = start_test_server(move |mut socket| async move {
1961            let request = read_http_request_bytes(&mut socket).await;
1962            let (headers, body) = split_http_request(&request);
1963            assert_eq!(request_method(&headers), "GET");
1964            assert_eq!(
1965                request_target(&headers),
1966                "/v1/messages/batches/msgbatch_123"
1967            );
1968            assert!(body.is_empty());
1969            write_json_response(&mut socket, response_body).await;
1970        })
1971        .await;
1972
1973        let client = Anthropic::new(Some("test_key".to_string()))
1974            .unwrap()
1975            .with_base_url(base_url);
1976        let batch = client.get_message_batch("msgbatch_123").await.unwrap();
1977        assert_eq!(batch.id, "msgbatch_123");
1978        assert_eq!(batch.request_counts.succeeded, 1);
1979    }
1980
1981    #[tokio::test]
1982    async fn list_message_batches_sends_pagination_query_and_beta() {
1983        let response_body = r#"{
1984            "data": [],
1985            "has_more": false,
1986            "first_id": null,
1987            "last_id": null
1988        }"#;
1989
1990        let base_url = start_test_server(move |mut socket| async move {
1991            let request = read_http_request_bytes(&mut socket).await;
1992            let (headers, body) = split_http_request(&request);
1993            assert_eq!(request_method(&headers), "GET");
1994            assert_eq!(
1995                request_target(&headers),
1996                "/v1/messages/batches?after_id=msgbatch_a&limit=20"
1997            );
1998            assert_eq!(header_value(&headers, "anthropic-beta"), Some("list-beta"));
1999            assert!(body.is_empty());
2000            write_json_response(&mut socket, response_body).await;
2001        })
2002        .await;
2003
2004        let client = Anthropic::new(Some("test_key".to_string()))
2005            .unwrap()
2006            .with_base_url(base_url);
2007        let params = MessageBatchListParams::new()
2008            .with_after_id("msgbatch_a")
2009            .with_limit(20)
2010            .with_beta("list-beta");
2011        let page = client.list_message_batches(Some(params)).await.unwrap();
2012        assert!(page.data.is_empty());
2013        assert!(!page.has_more);
2014    }
2015
2016    #[tokio::test]
2017    async fn cancel_message_batch_posts_empty_body() {
2018        let response_body = r#"{
2019            "id": "msgbatch_123",
2020            "type": "message_batch",
2021            "processing_status": "canceling",
2022            "request_counts": {
2023                "processing": 1,
2024                "succeeded": 0,
2025                "errored": 0,
2026                "canceled": 0,
2027                "expired": 0
2028            },
2029            "ended_at": null,
2030            "created_at": "2024-09-24T18:37:24Z",
2031            "expires_at": "2024-09-25T18:37:24Z",
2032            "cancel_initiated_at": "2024-09-24T18:39:03Z",
2033            "results_url": null
2034        }"#;
2035
2036        let base_url = start_test_server(move |mut socket| async move {
2037            let request = read_http_request_bytes(&mut socket).await;
2038            let (headers, body) = split_http_request(&request);
2039            assert_eq!(request_method(&headers), "POST");
2040            assert_eq!(
2041                request_target(&headers),
2042                "/v1/messages/batches/msgbatch_123/cancel"
2043            );
2044            assert!(body.is_empty());
2045            write_json_response(&mut socket, response_body).await;
2046        })
2047        .await;
2048
2049        let client = Anthropic::new(Some("test_key".to_string()))
2050            .unwrap()
2051            .with_base_url(base_url);
2052        let batch = client.cancel_message_batch("msgbatch_123").await.unwrap();
2053        assert_eq!(batch.id, "msgbatch_123");
2054    }
2055
2056    #[tokio::test]
2057    async fn delete_message_batch_uses_delete_endpoint() {
2058        let response_body = r#"{
2059            "id": "msgbatch_123",
2060            "type": "message_batch_deleted"
2061        }"#;
2062
2063        let base_url = start_test_server(move |mut socket| async move {
2064            let request = read_http_request_bytes(&mut socket).await;
2065            let (headers, body) = split_http_request(&request);
2066            assert_eq!(request_method(&headers), "DELETE");
2067            assert_eq!(
2068                request_target(&headers),
2069                "/v1/messages/batches/msgbatch_123"
2070            );
2071            assert!(body.is_empty());
2072            write_json_response(&mut socket, response_body).await;
2073        })
2074        .await;
2075
2076        let client = Anthropic::new(Some("test_key".to_string()))
2077            .unwrap()
2078            .with_base_url(base_url);
2079        let deleted = client.delete_message_batch("msgbatch_123").await.unwrap();
2080        assert_eq!(deleted.r#type, "message_batch_deleted");
2081    }
2082
2083    #[tokio::test]
2084    async fn stream_message_batch_results_preserves_jsonl_order_without_trailing_newline() {
2085        let results_body = concat!(
2086            r#"{"custom_id":"second","result":{"type":"expired"}}"#,
2087            "\n",
2088            r#"{"custom_id":"first","result":{"type":"succeeded","message":{"id":"msg_123","type":"message","role":"assistant","model":"claude-haiku-4-5","content":[{"type":"text","text":"ok"}],"stop_reason":"end_turn","stop_sequence":null,"usage":{"input_tokens":1,"output_tokens":1}}}}"#
2089        );
2090
2091        let base_url = start_test_server(move |mut socket| async move {
2092            let request = read_http_request_bytes(&mut socket).await;
2093            let (headers, body) = split_http_request(&request);
2094            assert_eq!(request_method(&headers), "GET");
2095            assert_eq!(
2096                request_target(&headers),
2097                "/v1/messages/batches/msgbatch_123/results"
2098            );
2099            assert!(body.is_empty());
2100
2101            let response_headers = format!(
2102                "HTTP/1.1 200 OK\r\n\
2103Content-Type: application/jsonl\r\n\
2104Content-Length: {}\r\n\
2105Connection: close\r\n\r\n",
2106                results_body.len()
2107            );
2108            socket.write_all(response_headers.as_bytes()).await.unwrap();
2109            let split_at = results_body.find('\n').unwrap() + 1;
2110            socket
2111                .write_all(&results_body.as_bytes()[..split_at])
2112                .await
2113                .unwrap();
2114            socket
2115                .write_all(&results_body.as_bytes()[split_at..])
2116                .await
2117                .unwrap();
2118            socket.shutdown().await.unwrap();
2119        })
2120        .await;
2121
2122        let client = Anthropic::new(Some("test_key".to_string()))
2123            .unwrap()
2124            .with_base_url(base_url);
2125        let stream = client
2126            .stream_message_batch_results("msgbatch_123")
2127            .await
2128            .unwrap();
2129        let mut stream = std::pin::pin!(stream);
2130
2131        let first = stream.next().await.unwrap().unwrap();
2132        assert_eq!(first.custom_id, "second");
2133        assert!(matches!(first.result, MessageBatchResultVariant::Expired));
2134
2135        let second = stream.next().await.unwrap().unwrap();
2136        assert_eq!(second.custom_id, "first");
2137        assert!(matches!(
2138            second.result,
2139            MessageBatchResultVariant::Succeeded { .. }
2140        ));
2141
2142        assert!(stream.next().await.is_none());
2143    }
2144
2145    #[tokio::test]
2146    async fn streaming_timeout_is_inactivity_based() {
2147        let base_url = start_test_server(|mut socket| async move {
2148            read_http_request(&mut socket).await;
2149            socket
2150                .write_all(
2151                    b"HTTP/1.1 200 OK\r\n\
2152Content-Type: text/event-stream\r\n\
2153Cache-Control: no-cache\r\n\
2154Connection: close\r\n\r\n",
2155                )
2156                .await
2157                .unwrap();
2158
2159            for chunk in [
2160                b"event: ping\n".as_slice(),
2161                b"data: {}\n".as_slice(),
2162                b"\n".as_slice(),
2163            ] {
2164                socket.write_all(chunk).await.unwrap();
2165                socket.flush().await.unwrap();
2166                tokio::time::sleep(Duration::from_millis(40)).await;
2167            }
2168        })
2169        .await;
2170
2171        let client = Anthropic::new(Some("test_key".to_string()))
2172            .unwrap()
2173            .with_base_url(base_url)
2174            .with_timeout(Duration::from_millis(75))
2175            .unwrap();
2176        let params = MessageCreateParams::new_streaming(
2177            16,
2178            vec![MessageParam::user("ping")],
2179            Model::Known(KnownModel::ClaudeHaiku45),
2180        );
2181
2182        let mut stream = std::pin::pin!(client.stream(&params).await.unwrap());
2183        let first = stream.next().await.unwrap().unwrap();
2184        assert_eq!(first, MessageStreamEvent::Ping);
2185    }
2186
2187    #[tokio::test]
2188    async fn streaming_stall_reports_timeout_error() {
2189        let base_url = start_test_server(|mut socket| async move {
2190            read_http_request(&mut socket).await;
2191            socket
2192                .write_all(
2193                    b"HTTP/1.1 200 OK\r\n\
2194Content-Type: text/event-stream\r\n\
2195Cache-Control: no-cache\r\n\
2196Connection: close\r\n\r\n\
2197event: ping\n",
2198                )
2199                .await
2200                .unwrap();
2201            socket.flush().await.unwrap();
2202            tokio::time::sleep(Duration::from_millis(120)).await;
2203            socket.write_all(b"data: {}\n\n").await.unwrap();
2204            socket.flush().await.unwrap();
2205        })
2206        .await;
2207
2208        let client = Anthropic::new(Some("test_key".to_string()))
2209            .unwrap()
2210            .with_base_url(base_url)
2211            .with_timeout(Duration::from_millis(50))
2212            .unwrap();
2213        let params = MessageCreateParams::new_streaming(
2214            16,
2215            vec![MessageParam::user("ping")],
2216            Model::Known(KnownModel::ClaudeHaiku45),
2217        );
2218
2219        let mut stream = std::pin::pin!(client.stream(&params).await.unwrap());
2220        let err = stream.next().await.unwrap().unwrap_err();
2221        assert!(matches!(err, Error::Timeout { .. }));
2222        assert!(err.to_string().contains("operation timed out"));
2223    }
2224
2225    #[tokio::test]
2226    async fn non_streaming_body_stall_reports_timeout_error() {
2227        let base_url = start_test_server(|mut socket| async move {
2228            read_http_request(&mut socket).await;
2229            let body_prefix = b"{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude-haiku-4-5-20251001\",\"content\":[{\"type\":\"text\",\"text\":\"hel";
2230            let body_suffix = b"lo\"}],\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":1,\"output_tokens\":1}}";
2231            let headers = format!(
2232                "HTTP/1.1 200 OK\r\n\
2233Content-Type: application/json\r\n\
2234Content-Length: {}\r\n\
2235Connection: close\r\n\r\n",
2236                body_prefix.len() + body_suffix.len()
2237            );
2238            socket
2239                .write_all(headers.as_bytes())
2240                .await
2241                .unwrap();
2242            socket.write_all(body_prefix).await.unwrap();
2243            socket.flush().await.unwrap();
2244            tokio::time::sleep(Duration::from_millis(120)).await;
2245            socket.write_all(body_suffix).await.unwrap();
2246            socket.flush().await.unwrap();
2247            socket.shutdown().await.unwrap();
2248        })
2249        .await;
2250
2251        let client = Anthropic::new(Some("test_key".to_string()))
2252            .unwrap()
2253            .with_base_url(base_url)
2254            .with_timeout(Duration::from_millis(50))
2255            .unwrap()
2256            .with_max_retries(0);
2257        let params = MessageCreateParams::new(
2258            16,
2259            vec![MessageParam::user("ping")],
2260            Model::Known(KnownModel::ClaudeHaiku45),
2261        );
2262
2263        let err = client.send(params).await.unwrap_err();
2264        assert!(matches!(err, Error::Timeout { .. }), "{err:?}");
2265    }
2266}