1use std::collections::VecDeque;
2use std::env;
3use std::error::Error as StdError;
4use std::fs;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use std::time::{Duration, Instant};
9
10use bytes::Bytes;
11use futures::{Stream, StreamExt, stream};
12use reqwest::header::{HeaderMap, HeaderValue};
13use reqwest::{Client as ReqwestClient, Response, header};
14use serde::Deserialize;
15use tokio::time::sleep;
16
17use crate::AccumulatingStream;
18use crate::backoff::ExponentialBackoff;
19use crate::client_logger::ClientLogger;
20use crate::error::{Error, Result};
21use crate::observability::{
22 CLIENT_REQUEST_DURATION, CLIENT_REQUEST_ERRORS, CLIENT_REQUEST_RETRIES, CLIENT_REQUESTS,
23 CLIENT_RETRY_BACKOFF,
24};
25use crate::sse::process_message_stream_sse;
26use crate::types::{
27 DeletedMessageBatch, Message, MessageBatch, MessageBatchCreateParams, MessageBatchListParams,
28 MessageBatchListResponse, MessageBatchResult, MessageCountTokensParams, MessageCreateParams,
29 MessageStreamEvent, MessageTokensCount, ModelInfo, ModelListParams, ModelListResponse,
30};
31
32pub struct LoggingStream<'a> {
38 inner: AccumulatingStream,
39 logger: &'a dyn ClientLogger,
40 receiver: Option<tokio::sync::oneshot::Receiver<Result<Message>>>,
41}
42
43impl<'a> LoggingStream<'a> {
44 fn new(
46 inner: AccumulatingStream,
47 receiver: tokio::sync::oneshot::Receiver<Result<Message>>,
48 logger: &'a dyn ClientLogger,
49 ) -> Self {
50 Self {
51 inner,
52 logger,
53 receiver: Some(receiver),
54 }
55 }
56}
57
58impl Stream for LoggingStream<'_> {
59 type Item = Result<MessageStreamEvent>;
60
61 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 let inner = Pin::new(&mut self.inner);
63 match inner.poll_next(cx) {
64 Poll::Ready(Some(Ok(event))) => {
65 self.logger.log_stream_event(&event);
66 Poll::Ready(Some(Ok(event)))
67 }
68 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
69 Poll::Ready(None) => {
70 if let Some(mut receiver) = self.receiver.take()
72 && let Ok(Ok(ref message)) = receiver.try_recv()
73 {
74 self.logger.log_stream_message(message);
75 }
76 Poll::Ready(None)
77 }
78 Poll::Pending => Poll::Pending,
79 }
80 }
81}
82
83const DEFAULT_API_URL: &str = "https://api.anthropic.com";
84const ANTHROPIC_API_VERSION: &str = "2023-06-01";
85const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
87const STRUCTURED_OUTPUTS_BETA: &str = "structured-outputs-2025-11-13";
88
89fn stream_debug_enabled() -> bool {
90 env::var_os("CLAUDIUS_DEBUG_STREAM").is_some()
91}
92
93fn debug_stream_request(url: &str, params: &MessageCreateParams) {
94 if !stream_debug_enabled() {
95 return;
96 }
97
98 match serde_json::to_string_pretty(params) {
99 Ok(body) => eprintln!("[claudius-debug] stream request POST {url}\n{body}"),
100 Err(err) => eprintln!("[claudius-debug] failed to serialize stream request: {err}"),
101 }
102}
103
104fn format_reqwest_error(err: &reqwest::Error) -> String {
105 let mut parts = vec![err.to_string()];
106 let mut source = StdError::source(err);
107 while let Some(inner) = source {
108 let detail = inner.to_string();
109 if !parts.iter().any(|part| part == &detail) {
110 parts.push(detail);
111 }
112 source = inner.source();
113 }
114 parts.join(": ")
115}
116
117const MAX_MESSAGE_BATCH_RESULT_LINE_BYTES: usize = 64 * 1024 * 1024;
118
119struct MessageBatchJsonlState<S> {
120 byte_stream: S,
121 buffer: Vec<u8>,
122 pending_lines: VecDeque<Vec<u8>>,
123 finished: bool,
124}
125
126fn map_batch_result_stream_error(err: reqwest::Error) -> Error {
127 let details = format_reqwest_error(&err);
128 if err.is_timeout() {
129 Error::timeout(
130 format!("Message batch results stream timed out: {details}"),
131 None,
132 )
133 } else if err.is_connect() {
134 Error::connection(
135 format!("Message batch results stream connection error: {details}"),
136 Some(Box::new(err)),
137 )
138 } else {
139 Error::streaming(
140 format!("Error in message batch results stream: {details}"),
141 Some(Box::new(err)),
142 )
143 }
144}
145
146fn parse_message_batch_result_line(line: &[u8]) -> Result<MessageBatchResult> {
147 let text = std::str::from_utf8(line).map_err(|e| {
148 Error::encoding(
149 format!("Invalid UTF-8 in message batch results JSONL: {e}"),
150 Some(Box::new(e)),
151 )
152 })?;
153
154 serde_json::from_str::<MessageBatchResult>(text).map_err(|e| {
155 Error::serialization(
156 format!("Failed to parse message batch results JSONL line: {e}"),
157 Some(Box::new(e)),
158 )
159 })
160}
161
162fn trim_jsonl_line(mut line: Vec<u8>) -> Vec<u8> {
163 if line.ends_with(b"\n") {
164 line.pop();
165 }
166 if line.ends_with(b"\r") {
167 line.pop();
168 }
169 line
170}
171
172fn process_message_batch_result_jsonl<S>(
173 byte_stream: S,
174) -> impl Stream<Item = Result<MessageBatchResult>>
175where
176 S: Stream<Item = std::result::Result<Bytes, reqwest::Error>> + Unpin + 'static,
177{
178 let state = MessageBatchJsonlState {
179 byte_stream,
180 buffer: Vec::new(),
181 pending_lines: VecDeque::new(),
182 finished: false,
183 };
184
185 stream::unfold(state, |mut state| async move {
186 loop {
187 if let Some(line) = state.pending_lines.pop_front() {
188 if line.is_empty() {
189 continue;
190 }
191 return Some((parse_message_batch_result_line(&line), state));
192 }
193
194 if state.finished {
195 if state.buffer.is_empty() {
196 return None;
197 }
198 let line = trim_jsonl_line(std::mem::take(&mut state.buffer));
199 if line.is_empty() {
200 continue;
201 }
202 return Some((parse_message_batch_result_line(&line), state));
203 }
204
205 match state.byte_stream.next().await {
206 Some(Ok(bytes)) => {
207 state.buffer.extend_from_slice(&bytes);
208 if state.buffer.len() > MAX_MESSAGE_BATCH_RESULT_LINE_BYTES {
209 state.buffer.clear();
210 state.finished = true;
211 return Some((
212 Err(Error::streaming(
213 format!(
214 "Message batch results JSONL line exceeded maximum size of {} bytes",
215 MAX_MESSAGE_BATCH_RESULT_LINE_BYTES
216 ),
217 None,
218 )),
219 state,
220 ));
221 }
222
223 while let Some(newline) = state.buffer.iter().position(|byte| *byte == b'\n') {
224 let line = trim_jsonl_line(state.buffer.drain(..=newline).collect());
225 if !line.is_empty() {
226 state.pending_lines.push_back(line);
227 }
228 }
229 }
230 Some(Err(err)) => {
231 state.finished = true;
232 return Some((Err(map_batch_result_stream_error(err)), state));
233 }
234 None => {
235 state.finished = true;
236 }
237 }
238 }
239 })
240}
241
242#[derive(Debug, Clone)]
244pub struct Anthropic {
245 api_key: String,
246 client: ReqwestClient,
247 base_url: String,
248 timeout: Duration,
249 max_retries: usize,
250 throughput_ops_sec: f64,
251 reserve_capacity: f64,
252 cached_headers: Arc<HeaderMap>,
254 default_betas: Vec<String>,
256}
257
258impl Anthropic {
259 fn build_http_client(timeout: Duration) -> Result<ReqwestClient> {
260 ReqwestClient::builder()
261 .connect_timeout(timeout)
262 .read_timeout(timeout)
263 .pool_max_idle_per_host(10) .pool_idle_timeout(Duration::from_secs(90))
265 .tcp_keepalive(Duration::from_secs(60))
266 .build()
267 .map_err(|e| {
268 Error::http_client(
269 format!("Failed to build HTTP client: {e}"),
270 Some(Box::new(e)),
271 )
272 })
273 }
274
275 fn resolve_api_key(key_value: &str) -> Result<String> {
277 if let Some(stripped) = key_value.strip_prefix("file://") {
278 let path = if stripped.starts_with('/') {
280 stripped.to_string()
282 } else {
283 stripped.to_string()
285 };
286
287 fs::read_to_string(&path)
288 .map(|content| content.trim().to_string())
289 .map_err(|e| {
290 Error::validation(
291 format!("Failed to read API key from file '{}': {}", path, e),
292 Some("api_key".to_string()),
293 )
294 })
295 } else {
296 Ok(key_value.to_string())
298 }
299 }
300
301 pub fn new(api_key: Option<String>) -> Result<Self> {
310 let api_key = match api_key {
311 Some(key) => Self::resolve_api_key(&key)?,
312 None => match env::var("CLAUDIUS_API_KEY").ok() {
313 Some(key) => Self::resolve_api_key(&key)?,
314 None => {
315 let env_key = env::var("ANTHROPIC_API_KEY").map_err(|_| {
316 Error::authentication(
317 "API key not provided and ANTHROPIC_API_KEY environment variable not set",
318 )
319 })?;
320 Self::resolve_api_key(&env_key)?
321 }
322 },
323 };
324
325 let timeout = DEFAULT_TIMEOUT;
326 let client = Self::build_http_client(timeout)?;
327
328 let cached_headers = Arc::new(Self::build_default_headers(&api_key)?);
330
331 let base_url = env::var("CLAUDIUS_BASE_URL")
333 .or_else(|_| env::var("ANTHROPIC_BASE_URL"))
334 .unwrap_or_else(|_| DEFAULT_API_URL.to_string());
335
336 Ok(Self {
337 api_key,
338 client,
339 base_url,
340 timeout,
341 max_retries: 3,
342 throughput_ops_sec: 1.0 / 60.0,
343 reserve_capacity: 1.0 / 60.0,
344 cached_headers,
345 default_betas: Vec::new(),
346 })
347 }
348
349 pub fn with_base_url(mut self, base_url: String) -> Self {
373 self.base_url = base_url;
374 self
375 }
376
377 pub fn with_timeout(mut self, timeout: Duration) -> Result<Self> {
382 self.timeout = timeout;
383
384 self.client = Self::build_http_client(timeout).map_err(|e| match e {
385 Error::HttpClient { source, .. } => Error::http_client(
386 "Failed to build HTTP client with new timeout",
387 source.map(|src| Box::new(src) as Box<dyn std::error::Error + Send + Sync>),
388 ),
389 other => other,
390 })?;
391 Ok(self)
392 }
393
394 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
398 self.max_retries = max_retries;
399 self
400 }
401
402 pub fn api_key(&self) -> &str {
404 &self.api_key
405 }
406
407 pub fn with_backoff_params(mut self, throughput_ops_sec: f64, reserve_capacity: f64) -> Self {
411 self.throughput_ops_sec = throughput_ops_sec;
412 self.reserve_capacity = reserve_capacity;
413 self
414 }
415
416 pub fn with_default_betas(
421 mut self,
422 betas: impl IntoIterator<Item = impl Into<String>>,
423 ) -> Self {
424 self.default_betas = betas.into_iter().map(Into::into).collect();
425 self
426 }
427
428 pub fn with_base_url_and_timeout(self, base_url: String, timeout: Duration) -> Result<Self> {
432 self.with_base_url(base_url).with_timeout(timeout)
433 }
434
435 fn build_default_headers(api_key: &str) -> Result<HeaderMap> {
437 let mut headers = HeaderMap::new();
438 headers.insert(
439 header::CONTENT_TYPE,
440 HeaderValue::from_static("application/json"),
441 );
442 headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
443 headers.insert(
444 "x-api-key",
445 HeaderValue::from_str(api_key).map_err(|e| {
446 Error::validation(
447 format!("Invalid API key format: {e}"),
448 Some("api_key".to_string()),
449 )
450 })?,
451 );
452 headers.insert(
453 "anthropic-version",
454 HeaderValue::from_static(ANTHROPIC_API_VERSION),
455 );
456 Ok(headers)
457 }
458
459 fn default_headers(&self) -> HeaderMap {
461 (*self.cached_headers).clone()
462 }
463
464 fn build_url(&self, endpoint: &str) -> String {
477 let base = self.base_url.trim_end_matches('/');
478 format!("{}/v1/{}", base, endpoint)
479 }
480
481 fn collect_betas(&self, request_betas: Option<&[String]>, auto_betas: &[&str]) -> Vec<String> {
485 let mut seen = std::collections::HashSet::new();
486 let mut result = Vec::new();
487
488 for beta in &self.default_betas {
489 if seen.insert(beta.as_str().to_owned()) {
490 result.push(beta.clone());
491 }
492 }
493
494 if let Some(betas) = request_betas {
495 for beta in betas {
496 if seen.insert(beta.clone()) {
497 result.push(beta.clone());
498 }
499 }
500 }
501
502 for &beta in auto_betas {
503 if seen.insert(beta.to_owned()) {
504 result.push(beta.to_owned());
505 }
506 }
507
508 result
509 }
510
511 fn headers_with_betas(&self, betas: &[String]) -> Option<HeaderMap> {
515 if betas.is_empty() {
516 return None;
517 }
518 let mut headers = self.default_headers();
519 let value = betas.join(", ");
520 if let Ok(hv) = HeaderValue::from_str(&value) {
522 headers.insert("anthropic-beta", hv);
523 }
524 Some(headers)
525 }
526
527 async fn retry_with_backoff<F, Fut, T>(&self, operation: F) -> Result<T>
529 where
530 F: Fn() -> Fut,
531 Fut: std::future::Future<Output = Result<T>>,
532 {
533 let backoff = ExponentialBackoff::new(self.throughput_ops_sec, self.reserve_capacity);
534 let mut last_error = None;
535
536 for attempt in 0..=self.max_retries {
537 match operation().await {
538 Ok(result) => return Ok(result),
539 Err(error) => {
540 if !error.is_retryable() {
542 return Err(error);
543 }
544
545 if attempt == self.max_retries {
547 last_error = Some(error);
548 break;
549 }
550
551 let exp_backoff_duration = backoff.next();
553
554 let header_backoff_duration = match &error {
556 Error::RateLimit {
557 retry_after: Some(seconds),
558 ..
559 } => Some(Duration::from_secs(*seconds)),
560 Error::ServiceUnavailable {
561 retry_after: Some(seconds),
562 ..
563 } => Some(Duration::from_secs(*seconds)),
564 _ => None,
565 };
566
567 let sleep_duration = match header_backoff_duration {
569 Some(header_duration) => exp_backoff_duration.max(header_duration),
570 None => exp_backoff_duration,
571 };
572
573 CLIENT_REQUEST_RETRIES.click();
574 CLIENT_RETRY_BACKOFF.add(sleep_duration.as_secs_f64());
575 sleep(sleep_duration).await;
576 last_error = Some(error);
577 }
578 }
579 }
580
581 Err(last_error
582 .unwrap_or_else(|| Error::unknown("Failed after retries without capturing error")))
583 }
584
585 async fn process_error_response(response: Response) -> Error {
587 let status = response.status();
588 let status_code = status.as_u16();
589
590 let request_id = response
592 .headers()
593 .get("x-request-id")
594 .and_then(|val| val.to_str().ok())
595 .map(String::from);
596
597 let retry_after = response
598 .headers()
599 .get("retry-after")
600 .and_then(|val| val.to_str().ok())
601 .and_then(|val| val.parse::<u64>().ok());
602
603 #[derive(Deserialize)]
605 struct ErrorResponse {
606 error: Option<ErrorDetail>,
607 }
608
609 #[derive(Deserialize)]
610 struct ErrorDetail {
611 #[serde(rename = "type")]
612 error_type: Option<String>,
613 message: Option<String>,
614 param: Option<String>,
615 }
616
617 let error_body = match response.text().await {
618 Ok(body) => body,
619 Err(e) => {
620 return Error::http_client(
621 format!("Failed to read error response: {e}"),
622 Some(Box::new(e)),
623 );
624 }
625 };
626
627 let parsed_error = serde_json::from_str::<ErrorResponse>(&error_body).ok();
629 let error_type = parsed_error
630 .as_ref()
631 .and_then(|e| e.error.as_ref())
632 .and_then(|e| e.error_type.clone());
633 let error_message = parsed_error
634 .as_ref()
635 .and_then(|e| e.error.as_ref())
636 .and_then(|e| e.message.clone())
637 .unwrap_or_else(|| error_body.clone());
638 let error_param = parsed_error
639 .as_ref()
640 .and_then(|e| e.error.as_ref())
641 .and_then(|e| e.param.clone());
642
643 match status_code {
645 400 => Error::bad_request(error_message, error_param),
646 401 => Error::authentication(error_message),
647 403 => Error::permission(error_message),
648 404 => Error::not_found(error_message, None, None),
649 408 => Error::timeout(error_message, None),
650 429 => Error::rate_limit(error_message, retry_after),
651 500 => Error::internal_server(error_message, request_id),
652 502..=504 => Error::service_unavailable(error_message, retry_after),
653 529 => Error::rate_limit(error_message, retry_after),
654 _ => Error::api(status_code, error_type, error_message, request_id),
655 }
656 }
657
658 fn map_request_error(&self, e: reqwest::Error) -> Error {
660 let details = format_reqwest_error(&e);
661 if e.is_timeout() {
662 Error::timeout(
663 format!("Request timed out: {details}"),
664 Some(self.timeout.as_secs_f64()),
665 )
666 } else if e.is_connect() {
667 Error::connection(format!("Connection error: {details}"), Some(Box::new(e)))
668 } else {
669 Error::http_client(format!("Request failed: {details}"), Some(Box::new(e)))
670 }
671 }
672
673 fn map_response_body_error(&self, e: reqwest::Error) -> Error {
674 let details = format_reqwest_error(&e);
675 if e.is_timeout() {
676 Error::timeout(
677 format!("Response body timed out: {details}"),
678 Some(self.timeout.as_secs_f64()),
679 )
680 } else if e.is_connect() {
681 Error::connection(
682 format!("Response body connection error: {details}"),
683 Some(Box::new(e)),
684 )
685 } else {
686 Error::http_client(
687 format!("Failed to read response body: {details}"),
688 Some(Box::new(e)),
689 )
690 }
691 }
692
693 async fn execute_post_request<T: serde::de::DeserializeOwned>(
695 &self,
696 url: &str,
697 body: &impl serde::Serialize,
698 headers: Option<HeaderMap>,
699 ) -> Result<T> {
700 let headers = headers.unwrap_or_else(|| self.default_headers());
701
702 let response = self
703 .client
704 .post(url)
705 .headers(headers)
706 .json(body)
707 .send()
708 .await
709 .map_err(|e| self.map_request_error(e))?;
710
711 if !response.status().is_success() {
712 return Err(Self::process_error_response(response).await);
713 }
714
715 let body = response
716 .bytes()
717 .await
718 .map_err(|e| self.map_response_body_error(e))?;
719
720 serde_json::from_slice::<T>(&body).map_err(|e| {
721 Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
722 })
723 }
724
725 async fn execute_get_request<T: serde::de::DeserializeOwned>(
727 &self,
728 url: &str,
729 query_params: Option<&[(String, String)]>,
730 headers: Option<HeaderMap>,
731 ) -> Result<T> {
732 let headers = headers.unwrap_or_else(|| self.default_headers());
733 let mut request = self.client.get(url).headers(headers);
734
735 if let Some(params) = query_params {
736 for (key, value) in params {
737 request = request.query(&[(key, value)]);
738 }
739 }
740
741 let response = request
742 .send()
743 .await
744 .map_err(|e| self.map_request_error(e))?;
745
746 if !response.status().is_success() {
747 return Err(Self::process_error_response(response).await);
748 }
749
750 let body = response
751 .bytes()
752 .await
753 .map_err(|e| self.map_response_body_error(e))?;
754
755 serde_json::from_slice::<T>(&body).map_err(|e| {
756 Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
757 })
758 }
759
760 async fn execute_post_empty_request<T: serde::de::DeserializeOwned>(
762 &self,
763 url: &str,
764 headers: Option<HeaderMap>,
765 ) -> Result<T> {
766 let headers = headers.unwrap_or_else(|| self.default_headers());
767
768 let response = self
769 .client
770 .post(url)
771 .headers(headers)
772 .send()
773 .await
774 .map_err(|e| self.map_request_error(e))?;
775
776 if !response.status().is_success() {
777 return Err(Self::process_error_response(response).await);
778 }
779
780 let body = response
781 .bytes()
782 .await
783 .map_err(|e| self.map_response_body_error(e))?;
784
785 serde_json::from_slice::<T>(&body).map_err(|e| {
786 Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
787 })
788 }
789
790 async fn execute_delete_request<T: serde::de::DeserializeOwned>(
792 &self,
793 url: &str,
794 headers: Option<HeaderMap>,
795 ) -> Result<T> {
796 let headers = headers.unwrap_or_else(|| self.default_headers());
797
798 let response = self
799 .client
800 .delete(url)
801 .headers(headers)
802 .send()
803 .await
804 .map_err(|e| self.map_request_error(e))?;
805
806 if !response.status().is_success() {
807 return Err(Self::process_error_response(response).await);
808 }
809
810 let body = response
811 .bytes()
812 .await
813 .map_err(|e| self.map_response_body_error(e))?;
814
815 serde_json::from_slice::<T>(&body).map_err(|e| {
816 Error::serialization(format!("Failed to parse response: {e}"), Some(Box::new(e)))
817 })
818 }
819
820 async fn execute_get_stream_request(
822 &self,
823 url: &str,
824 headers: Option<HeaderMap>,
825 ) -> Result<Response> {
826 let headers = headers.unwrap_or_else(|| self.default_headers());
827
828 let response = self
829 .client
830 .get(url)
831 .headers(headers)
832 .send()
833 .await
834 .map_err(|e| self.map_request_error(e))?;
835
836 if !response.status().is_success() {
837 return Err(Self::process_error_response(response).await);
838 }
839
840 Ok(response)
841 }
842
843 pub async fn send(&self, mut params: MessageCreateParams) -> Result<Message> {
845 let start = Instant::now();
846 CLIENT_REQUESTS.click();
847
848 if let Err(err) = params.validate() {
850 CLIENT_REQUEST_ERRORS.click();
851 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
852 return Err(err);
853 }
854
855 params.stream = false;
857
858 let auto_betas: Vec<&str> = if params.requires_structured_outputs_beta() {
860 vec![STRUCTURED_OUTPUTS_BETA]
861 } else {
862 vec![]
863 };
864 let all_betas = self.collect_betas(params.betas.as_deref(), &auto_betas);
865 let headers = self.headers_with_betas(&all_betas);
866
867 let result = self
868 .retry_with_backoff(|| async {
869 let url = self.build_url("messages");
870 self.execute_post_request(&url, ¶ms, headers.clone())
871 .await
872 })
873 .await;
874
875 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
876 if result.is_err() {
877 CLIENT_REQUEST_ERRORS.click();
878 }
879 result
880 }
881
882 pub async fn send_with_logger(
887 &self,
888 params: MessageCreateParams,
889 logger: &dyn ClientLogger,
890 ) -> Result<Message> {
891 let result = self.send(params).await;
892 if let Ok(ref message) = result {
893 logger.log_response(message);
894 }
895 result
896 }
897
898 pub async fn stream(
902 &self,
903 params: &MessageCreateParams,
904 ) -> Result<impl Stream<Item = Result<MessageStreamEvent>> + use<>> {
905 let start = Instant::now();
906 CLIENT_REQUESTS.click();
907
908 if let Err(err) = params.validate() {
910 CLIENT_REQUEST_ERRORS.click();
911 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
912 return Err(err);
913 }
914
915 if !params.stream {
917 let err = Error::validation(
918 "stream must be true for streaming requests",
919 Some("stream".to_string()),
920 );
921 CLIENT_REQUEST_ERRORS.click();
922 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
923 return Err(err);
924 }
925
926 let auto_betas: Vec<&str> = if params.requires_structured_outputs_beta() {
928 vec![STRUCTURED_OUTPUTS_BETA]
929 } else {
930 vec![]
931 };
932 let all_betas = self.collect_betas(params.betas.as_deref(), &auto_betas);
933
934 let response = self
935 .retry_with_backoff(|| async {
936 let url = self.build_url("messages");
937 debug_stream_request(&url, params);
938
939 let mut headers = self
940 .headers_with_betas(&all_betas)
941 .unwrap_or_else(|| self.default_headers());
942 headers.insert(
943 header::ACCEPT,
944 HeaderValue::from_static("text/event-stream"),
945 );
946
947 let response = self
948 .client
949 .post(&url)
950 .headers(headers)
951 .json(params)
952 .send()
953 .await
954 .map_err(|e| self.map_request_error(e))?;
955
956 if !response.status().is_success() {
957 return Err(Self::process_error_response(response).await);
958 }
959
960 Ok(response)
961 })
962 .await;
963
964 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
965 let response = match response {
966 Ok(response) => response,
967 Err(err) => {
968 CLIENT_REQUEST_ERRORS.click();
969 return Err(err);
970 }
971 };
972
973 let stream = response.bytes_stream();
975
976 Ok(process_message_stream_sse(stream))
978 }
979
980 pub async fn stream_with_logger<'a>(
990 &self,
991 params: &MessageCreateParams,
992 logger: &'a dyn ClientLogger,
993 ) -> Result<LoggingStream<'a>> {
994 let raw_stream = self.stream(params).await?;
995 let (accumulating_stream, receiver) = AccumulatingStream::new(raw_stream);
996 Ok(LoggingStream::new(accumulating_stream, receiver, logger))
997 }
998
999 pub async fn count_tokens(
1004 &self,
1005 params: MessageCountTokensParams,
1006 ) -> Result<MessageTokensCount> {
1007 let start = Instant::now();
1008 CLIENT_REQUESTS.click();
1009
1010 let all_betas = self.collect_betas(params.betas.as_deref(), &[]);
1012 let headers = self.headers_with_betas(&all_betas);
1013
1014 let result = self
1015 .retry_with_backoff(|| async {
1016 let url = self.build_url("messages/count_tokens");
1017 self.execute_post_request(&url, ¶ms, headers.clone())
1018 .await
1019 })
1020 .await;
1021
1022 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1023 if result.is_err() {
1024 CLIENT_REQUEST_ERRORS.click();
1025 }
1026 result
1027 }
1028
1029 pub async fn create_message_batch(
1031 &self,
1032 params: MessageBatchCreateParams,
1033 ) -> Result<MessageBatch> {
1034 let start = Instant::now();
1035 CLIENT_REQUESTS.click();
1036
1037 if let Err(err) = params.validate() {
1038 CLIENT_REQUEST_ERRORS.click();
1039 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1040 return Err(err);
1041 }
1042
1043 let mut request_betas = Vec::new();
1044 if let Some(betas) = ¶ms.betas {
1045 request_betas.extend(betas.iter().cloned());
1046 }
1047 for request in ¶ms.requests {
1048 if let Some(betas) = &request.params.betas {
1049 request_betas.extend(betas.iter().cloned());
1050 }
1051 }
1052
1053 let auto_betas: Vec<&str> = if params
1054 .requests
1055 .iter()
1056 .any(|request| request.params.requires_structured_outputs_beta())
1057 {
1058 vec![STRUCTURED_OUTPUTS_BETA]
1059 } else {
1060 vec![]
1061 };
1062 let all_betas = if request_betas.is_empty() {
1063 self.collect_betas(None, &auto_betas)
1064 } else {
1065 self.collect_betas(Some(&request_betas), &auto_betas)
1066 };
1067 let headers = self.headers_with_betas(&all_betas);
1068
1069 let result = self
1070 .retry_with_backoff(|| async {
1071 let url = self.build_url("messages/batches");
1072 self.execute_post_request(&url, ¶ms, headers.clone())
1073 .await
1074 })
1075 .await;
1076
1077 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1078 if result.is_err() {
1079 CLIENT_REQUEST_ERRORS.click();
1080 }
1081 result
1082 }
1083
1084 pub async fn get_message_batch(&self, message_batch_id: &str) -> Result<MessageBatch> {
1086 let start = Instant::now();
1087 CLIENT_REQUESTS.click();
1088
1089 let all_betas = self.collect_betas(None, &[]);
1090 let headers = self.headers_with_betas(&all_betas);
1091
1092 let result = self
1093 .retry_with_backoff(|| async {
1094 let url = self.build_url(&format!("messages/batches/{message_batch_id}"));
1095 self.execute_get_request(&url, None, headers.clone()).await
1096 })
1097 .await;
1098
1099 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1100 if result.is_err() {
1101 CLIENT_REQUEST_ERRORS.click();
1102 }
1103 result
1104 }
1105
1106 pub async fn retrieve_message_batch(&self, message_batch_id: &str) -> Result<MessageBatch> {
1108 self.get_message_batch(message_batch_id).await
1109 }
1110
1111 pub async fn list_message_batches(
1113 &self,
1114 params: Option<MessageBatchListParams>,
1115 ) -> Result<MessageBatchListResponse> {
1116 let start = Instant::now();
1117 CLIENT_REQUESTS.click();
1118
1119 let request_betas = params.as_ref().and_then(|p| p.betas.as_deref());
1120 let all_betas = self.collect_betas(request_betas, &[]);
1121 let headers = self.headers_with_betas(&all_betas);
1122
1123 let result = self
1124 .retry_with_backoff(|| async {
1125 let url = self.build_url("messages/batches");
1126 let query_params = params.as_ref().map(|p| {
1127 let mut params = Vec::new();
1128 if let Some(ref after_id) = p.after_id {
1129 params.push(("after_id".to_string(), after_id.clone()));
1130 }
1131 if let Some(ref before_id) = p.before_id {
1132 params.push(("before_id".to_string(), before_id.clone()));
1133 }
1134 if let Some(limit) = p.limit {
1135 params.push(("limit".to_string(), limit.to_string()));
1136 }
1137 params
1138 });
1139
1140 self.execute_get_request(&url, query_params.as_deref(), headers.clone())
1141 .await
1142 })
1143 .await;
1144
1145 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1146 if result.is_err() {
1147 CLIENT_REQUEST_ERRORS.click();
1148 }
1149 result
1150 }
1151
1152 pub async fn cancel_message_batch(&self, message_batch_id: &str) -> Result<MessageBatch> {
1154 let start = Instant::now();
1155 CLIENT_REQUESTS.click();
1156
1157 let all_betas = self.collect_betas(None, &[]);
1158 let headers = self.headers_with_betas(&all_betas);
1159
1160 let result = self
1161 .retry_with_backoff(|| async {
1162 let url = self.build_url(&format!("messages/batches/{message_batch_id}/cancel"));
1163 self.execute_post_empty_request(&url, headers.clone()).await
1164 })
1165 .await;
1166
1167 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1168 if result.is_err() {
1169 CLIENT_REQUEST_ERRORS.click();
1170 }
1171 result
1172 }
1173
1174 pub async fn delete_message_batch(
1176 &self,
1177 message_batch_id: &str,
1178 ) -> Result<DeletedMessageBatch> {
1179 let start = Instant::now();
1180 CLIENT_REQUESTS.click();
1181
1182 let all_betas = self.collect_betas(None, &[]);
1183 let headers = self.headers_with_betas(&all_betas);
1184
1185 let result = self
1186 .retry_with_backoff(|| async {
1187 let url = self.build_url(&format!("messages/batches/{message_batch_id}"));
1188 self.execute_delete_request(&url, headers.clone()).await
1189 })
1190 .await;
1191
1192 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1193 if result.is_err() {
1194 CLIENT_REQUEST_ERRORS.click();
1195 }
1196 result
1197 }
1198
1199 pub async fn stream_message_batch_results(
1201 &self,
1202 message_batch_id: &str,
1203 ) -> Result<impl Stream<Item = Result<MessageBatchResult>> + use<>> {
1204 let start = Instant::now();
1205 CLIENT_REQUESTS.click();
1206
1207 let all_betas = self.collect_betas(None, &[]);
1208 let headers = self.headers_with_betas(&all_betas);
1209
1210 let response = self
1211 .retry_with_backoff(|| async {
1212 let url = self.build_url(&format!("messages/batches/{message_batch_id}/results"));
1213 self.execute_get_stream_request(&url, headers.clone()).await
1214 })
1215 .await;
1216
1217 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1218 let response = match response {
1219 Ok(response) => response,
1220 Err(err) => {
1221 CLIENT_REQUEST_ERRORS.click();
1222 return Err(err);
1223 }
1224 };
1225
1226 Ok(process_message_batch_result_jsonl(response.bytes_stream()))
1227 }
1228
1229 pub async fn list_models(&self, params: Option<ModelListParams>) -> Result<ModelListResponse> {
1234 let start = Instant::now();
1235 CLIENT_REQUESTS.click();
1236
1237 let request_betas = params.as_ref().and_then(|p| p.betas.as_deref());
1239 let all_betas = self.collect_betas(request_betas, &[]);
1240 let headers = self.headers_with_betas(&all_betas);
1241
1242 let result = self
1243 .retry_with_backoff(|| async {
1244 let url = self.build_url("models");
1245
1246 let query_params = params.as_ref().map(|p| {
1247 let mut params = Vec::new();
1248 if let Some(ref after_id) = p.after_id {
1249 params.push(("after_id".to_string(), after_id.clone()));
1250 }
1251 if let Some(ref before_id) = p.before_id {
1252 params.push(("before_id".to_string(), before_id.clone()));
1253 }
1254 if let Some(limit) = p.limit {
1255 params.push(("limit".to_string(), limit.to_string()));
1256 }
1257 params
1258 });
1259
1260 self.execute_get_request(&url, query_params.as_deref(), headers.clone())
1261 .await
1262 })
1263 .await;
1264
1265 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1266 if result.is_err() {
1267 CLIENT_REQUEST_ERRORS.click();
1268 }
1269 result
1270 }
1271
1272 pub async fn get_model(&self, model_id: &str) -> Result<ModelInfo> {
1277 let start = Instant::now();
1278 CLIENT_REQUESTS.click();
1279 let result = self
1280 .retry_with_backoff(|| async {
1281 let url = self.build_url(&format!("models/{}", model_id));
1282 self.execute_get_request(&url, None, None).await
1283 })
1284 .await;
1285
1286 CLIENT_REQUEST_DURATION.add(start.elapsed().as_secs_f64());
1287 if result.is_err() {
1288 CLIENT_REQUEST_ERRORS.click();
1289 }
1290 result
1291 }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296 use super::*;
1297 use crate::{
1298 KnownModel, MessageBatchCreateParams, MessageBatchCreateRequest, MessageBatchListParams,
1299 MessageBatchResultVariant, MessageParam, Model,
1300 };
1301 use futures::StreamExt;
1302 use serde_json::Value;
1303 use std::sync::Arc;
1304 use std::sync::atomic::{AtomicUsize, Ordering};
1305 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1306 use tokio::net::TcpListener;
1307
1308 fn request_headers_end(buffer: &[u8]) -> Option<usize> {
1309 buffer.windows(4).position(|window| window == b"\r\n\r\n")
1310 }
1311
1312 async fn read_http_request_bytes(socket: &mut tokio::net::TcpStream) -> Vec<u8> {
1313 let mut buffer = Vec::new();
1314 let mut chunk = [0_u8; 1024];
1315 loop {
1316 let read = socket.read(&mut chunk).await.unwrap();
1317 assert!(
1318 read > 0,
1319 "client closed the connection before sending headers"
1320 );
1321 buffer.extend_from_slice(&chunk[..read]);
1322 if request_headers_end(&buffer).is_some() {
1323 break;
1324 }
1325 }
1326
1327 let headers_end = request_headers_end(&buffer).unwrap();
1328 let headers = String::from_utf8_lossy(&buffer[..headers_end]);
1329 let content_length = headers
1330 .lines()
1331 .find_map(|line| {
1332 let mut parts = line.splitn(2, ':');
1333 let name = parts.next()?.trim();
1334 let value = parts.next()?.trim();
1335 name.eq_ignore_ascii_case("content-length")
1336 .then(|| value.parse::<usize>().ok())
1337 .flatten()
1338 })
1339 .unwrap_or(0);
1340
1341 while buffer.len() - (headers_end + 4) < content_length {
1342 let read = socket.read(&mut chunk).await.unwrap();
1343 assert!(
1344 read > 0,
1345 "client closed the connection before sending the full body"
1346 );
1347 buffer.extend_from_slice(&chunk[..read]);
1348 }
1349 buffer
1350 }
1351
1352 async fn read_http_request(socket: &mut tokio::net::TcpStream) {
1353 read_http_request_bytes(socket).await;
1354 }
1355
1356 fn split_http_request(request: &[u8]) -> (String, String) {
1357 let headers_end = request_headers_end(request).unwrap();
1358 let headers = String::from_utf8_lossy(&request[..headers_end]).to_string();
1359 let body = String::from_utf8_lossy(&request[headers_end + 4..]).to_string();
1360 (headers, body)
1361 }
1362
1363 fn request_target(headers: &str) -> &str {
1364 headers
1365 .lines()
1366 .next()
1367 .and_then(|line| line.split_whitespace().nth(1))
1368 .unwrap()
1369 }
1370
1371 fn request_method(headers: &str) -> &str {
1372 headers
1373 .lines()
1374 .next()
1375 .and_then(|line| line.split_whitespace().next())
1376 .unwrap()
1377 }
1378
1379 fn header_value<'a>(headers: &'a str, name: &str) -> Option<&'a str> {
1380 headers.lines().find_map(|line| {
1381 let mut parts = line.splitn(2, ':');
1382 let header_name = parts.next()?.trim();
1383 let value = parts.next()?.trim();
1384 header_name.eq_ignore_ascii_case(name).then_some(value)
1385 })
1386 }
1387
1388 async fn write_json_response(socket: &mut tokio::net::TcpStream, body: &str) {
1389 let response = format!(
1390 "HTTP/1.1 200 OK\r\n\
1391Content-Type: application/json\r\n\
1392Content-Length: {}\r\n\
1393Connection: close\r\n\r\n{}",
1394 body.len(),
1395 body
1396 );
1397 socket.write_all(response.as_bytes()).await.unwrap();
1398 socket.shutdown().await.unwrap();
1399 }
1400
1401 async fn start_test_server<F, Fut>(handler: F) -> String
1402 where
1403 F: FnOnce(tokio::net::TcpStream) -> Fut + Send + 'static,
1404 Fut: std::future::Future<Output = ()> + Send + 'static,
1405 {
1406 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1407 let address = listener.local_addr().unwrap();
1408 tokio::spawn(async move {
1409 let (socket, _) = listener.accept().await.unwrap();
1410 socket.set_nodelay(true).unwrap();
1411 handler(socket).await;
1412 });
1413 format!("http://{}", address)
1414 }
1415
1416 #[tokio::test]
1417 async fn retry_logic_with_backoff() {
1418 let client = Anthropic {
1419 api_key: "test".to_string(),
1420 client: ReqwestClient::new(),
1421 base_url: "http://localhost".to_string(),
1422 timeout: Duration::from_secs(1),
1423 max_retries: 2,
1424 throughput_ops_sec: 1.0 / 60.0,
1425 reserve_capacity: 1.0 / 60.0,
1426 cached_headers: Arc::new(HeaderMap::new()),
1427 default_betas: Vec::new(),
1428 };
1429
1430 let attempt_counter = Arc::new(AtomicUsize::new(0));
1431 let counter_clone = attempt_counter.clone();
1432
1433 let result = client
1434 .retry_with_backoff(|| {
1435 let counter = counter_clone.clone();
1436 async move {
1437 let attempt = counter.fetch_add(1, Ordering::SeqCst);
1438 match attempt {
1439 0 | 1 => Err(Error::rate_limit("Rate limited", Some(1))),
1440 _ => Ok("success".to_string()),
1441 }
1442 }
1443 })
1444 .await;
1445
1446 assert!(result.is_ok());
1447 assert_eq!(result.unwrap(), "success");
1448 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1449 }
1450
1451 #[tokio::test]
1452 async fn retry_logic_with_non_retryable_error() {
1453 let client = Anthropic {
1454 api_key: "test".to_string(),
1455 client: ReqwestClient::new(),
1456 base_url: "http://localhost".to_string(),
1457 timeout: Duration::from_secs(1),
1458 max_retries: 2,
1459 throughput_ops_sec: 1.0 / 60.0,
1460 reserve_capacity: 1.0 / 60.0,
1461 cached_headers: Arc::new(HeaderMap::new()),
1462 default_betas: Vec::new(),
1463 };
1464
1465 let attempt_counter = Arc::new(AtomicUsize::new(0));
1466 let counter_clone = attempt_counter.clone();
1467
1468 let result: Result<String> = client
1469 .retry_with_backoff(|| {
1470 let counter = counter_clone.clone();
1471 async move {
1472 counter.fetch_add(1, Ordering::SeqCst);
1473 Err(Error::authentication("Invalid API key"))
1474 }
1475 })
1476 .await;
1477
1478 assert!(result.is_err());
1479 assert!(result.unwrap_err().is_authentication());
1480 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1482 }
1483
1484 #[tokio::test]
1485 async fn retry_logic_max_retries_exceeded() {
1486 let client = Anthropic {
1487 api_key: "test".to_string(),
1488 client: ReqwestClient::new(),
1489 base_url: "http://localhost".to_string(),
1490 timeout: Duration::from_secs(1),
1491 max_retries: 2,
1492 throughput_ops_sec: 1.0 / 60.0,
1493 reserve_capacity: 1.0 / 60.0,
1494 cached_headers: Arc::new(HeaderMap::new()),
1495 default_betas: Vec::new(),
1496 };
1497
1498 let attempt_counter = Arc::new(AtomicUsize::new(0));
1499 let counter_clone = attempt_counter.clone();
1500
1501 let result: Result<String> = client
1502 .retry_with_backoff(|| {
1503 let counter = counter_clone.clone();
1504 async move {
1505 counter.fetch_add(1, Ordering::SeqCst);
1506 Err(Error::rate_limit("Always rate limited", Some(1)))
1507 }
1508 })
1509 .await;
1510
1511 assert!(result.is_err());
1512 assert!(result.unwrap_err().is_rate_limit());
1513 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1515 }
1516
1517 #[tokio::test]
1518 async fn error_529_is_retryable() {
1519 let client = Anthropic {
1521 api_key: "test".to_string(),
1522 client: ReqwestClient::new(),
1523 base_url: "http://localhost".to_string(),
1524 timeout: Duration::from_secs(1),
1525 max_retries: 2,
1526 throughput_ops_sec: 1.0 / 60.0,
1527 reserve_capacity: 1.0 / 60.0,
1528 cached_headers: Arc::new(HeaderMap::new()),
1529 default_betas: Vec::new(),
1530 };
1531
1532 let attempt_counter = Arc::new(AtomicUsize::new(0));
1533 let counter_clone = attempt_counter.clone();
1534
1535 let result = client
1536 .retry_with_backoff(|| {
1537 let counter = counter_clone.clone();
1538 async move {
1539 let attempt = counter.fetch_add(1, Ordering::SeqCst);
1540 match attempt {
1541 0 | 1 => {
1542 Err(Error::api(
1544 529,
1545 Some("overloaded_error".to_string()),
1546 "Overloaded".to_string(),
1547 None,
1548 ))
1549 }
1550 _ => Ok("success".to_string()),
1551 }
1552 }
1553 })
1554 .await;
1555
1556 assert!(result.is_ok());
1557 assert_eq!(result.unwrap(), "success");
1558 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1560 }
1561
1562 #[test]
1563 fn error_529_mapped_correctly() {
1564 let error = Error::api(
1566 529,
1567 Some("overloaded_error".to_string()),
1568 "Overloaded".to_string(),
1569 None,
1570 );
1571 assert!(error.is_retryable());
1572
1573 let rate_limit_error = Error::rate_limit("Overloaded", Some(5));
1575 assert!(rate_limit_error.is_retryable());
1576 }
1577
1578 #[test]
1579 fn resolve_api_key_regular_value() {
1580 let result = Anthropic::resolve_api_key("sk-test-key-123");
1581 assert!(result.is_ok());
1582 assert_eq!(result.unwrap(), "sk-test-key-123");
1583 }
1584
1585 #[test]
1586 fn resolve_api_key_file_url_absolute() {
1587 let test_dir = std::env::temp_dir().join(format!("claudius_test_{}", std::process::id()));
1588 std::fs::create_dir_all(&test_dir).unwrap();
1589 let test_file = test_dir.join("test_api_key.txt");
1590 std::fs::write(&test_file, "sk-test-from-file-123\n").unwrap();
1591
1592 let file_url = format!("file://{}", test_file.display());
1593 let result = Anthropic::resolve_api_key(&file_url);
1594
1595 std::fs::remove_dir_all(&test_dir).unwrap();
1596
1597 assert!(result.is_ok());
1598 assert_eq!(result.unwrap(), "sk-test-from-file-123");
1599 }
1600
1601 #[test]
1602 fn resolve_api_key_file_url_relative() {
1603 let test_file = "test_relative_key.txt";
1604 std::fs::write(test_file, "sk-relative-key-456\n").unwrap();
1605
1606 let file_url = format!("file://{}", test_file);
1607 let result = Anthropic::resolve_api_key(&file_url);
1608
1609 std::fs::remove_file(test_file).unwrap();
1610
1611 assert!(result.is_ok());
1612 assert_eq!(result.unwrap(), "sk-relative-key-456");
1613 }
1614
1615 #[test]
1616 fn resolve_api_key_file_url_nonexistent() {
1617 let result = Anthropic::resolve_api_key("file:///nonexistent/path/to/key.txt");
1618 assert!(result.is_err());
1619
1620 let error = result.unwrap_err();
1621 assert!(error.is_validation());
1622 assert!(format!("{}", error).contains("Failed to read API key from file"));
1623 }
1624
1625 #[test]
1626 fn resolve_api_key_file_url_with_whitespace() {
1627 let test_file = "test_whitespace_key.txt";
1628 std::fs::write(test_file, " sk-whitespace-key-789 \n ").unwrap();
1629
1630 let file_url = format!("file://{}", test_file);
1631 let result = Anthropic::resolve_api_key(&file_url);
1632
1633 std::fs::remove_file(test_file).unwrap();
1634
1635 assert!(result.is_ok());
1636 assert_eq!(result.unwrap(), "sk-whitespace-key-789");
1637 }
1638
1639 #[test]
1640 fn client_builder_methods() {
1641 let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1642
1643 let configured_client = client
1645 .with_base_url("https://custom.api.com".to_string())
1646 .with_max_retries(5)
1647 .with_backoff_params(2.0, 1.0);
1648
1649 assert_eq!(configured_client.base_url, "https://custom.api.com");
1650 assert_eq!(configured_client.max_retries, 5);
1651 assert_eq!(configured_client.throughput_ops_sec, 2.0);
1652 assert_eq!(configured_client.reserve_capacity, 1.0);
1653 }
1654
1655 #[test]
1656 fn build_url_default_base() {
1657 let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1658 assert_eq!(
1660 client.build_url("messages"),
1661 "https://api.anthropic.com/v1/messages"
1662 );
1663 assert_eq!(
1664 client.build_url("messages/count_tokens"),
1665 "https://api.anthropic.com/v1/messages/count_tokens"
1666 );
1667 assert_eq!(
1668 client.build_url("models"),
1669 "https://api.anthropic.com/v1/models"
1670 );
1671 }
1672
1673 #[test]
1674 fn build_url_custom_base_without_trailing_slash() {
1675 let client = Anthropic::new(Some("test_key".to_string()))
1676 .unwrap()
1677 .with_base_url("https://api.minimax.io/anthropic".to_string());
1678 assert_eq!(
1679 client.build_url("messages"),
1680 "https://api.minimax.io/anthropic/v1/messages"
1681 );
1682 }
1683
1684 #[test]
1685 fn build_url_custom_base_with_trailing_slash() {
1686 let client = Anthropic::new(Some("test_key".to_string()))
1687 .unwrap()
1688 .with_base_url("https://api.minimax.io/anthropic/".to_string());
1689 assert_eq!(
1690 client.build_url("messages"),
1691 "https://api.minimax.io/anthropic/v1/messages"
1692 );
1693 }
1694
1695 #[test]
1696 fn build_url_minimax_china() {
1697 let client = Anthropic::new(Some("test_key".to_string()))
1698 .unwrap()
1699 .with_base_url("https://api.minimaxi.com/anthropic".to_string());
1700 assert_eq!(
1701 client.build_url("messages"),
1702 "https://api.minimaxi.com/anthropic/v1/messages"
1703 );
1704 assert_eq!(
1705 client.build_url(&format!("models/{}", "claude-3-opus")),
1706 "https://api.minimaxi.com/anthropic/v1/models/claude-3-opus"
1707 );
1708 }
1709
1710 #[test]
1711 fn client_timeout_configuration() {
1712 let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1713 let timeout = Duration::from_secs(30);
1714
1715 let configured_client = client.with_timeout(timeout).unwrap();
1716 assert_eq!(configured_client.timeout, timeout);
1717 }
1718
1719 #[test]
1720 fn client_cached_headers_performance() {
1721 let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1722
1723 let headers1 = client.default_headers();
1725 let headers2 = client.default_headers();
1726
1727 assert_eq!(headers1.len(), headers2.len());
1728 assert!(headers1.contains_key("x-api-key"));
1729 assert!(headers1.contains_key("anthropic-version"));
1730 assert!(headers1.contains_key("content-type"));
1731 }
1732
1733 #[test]
1734 fn request_error_mapping() {
1735 let client = Anthropic::new(Some("test_key".to_string())).unwrap();
1736
1737 let _timeout = Duration::from_secs(30);
1740 assert_eq!(client.timeout, DEFAULT_TIMEOUT); }
1742
1743 #[tokio::test]
1744 async fn concurrent_retry_safety() {
1745 use std::sync::atomic::{AtomicUsize, Ordering};
1746 use tokio::spawn;
1747
1748 let client = Anthropic {
1749 api_key: "test".to_string(),
1750 client: ReqwestClient::new(),
1751 base_url: "http://localhost".to_string(),
1752 timeout: Duration::from_secs(1),
1753 max_retries: 1,
1754 throughput_ops_sec: 1.0,
1755 reserve_capacity: 1.0,
1756 cached_headers: Arc::new(HeaderMap::new()),
1757 default_betas: Vec::new(),
1758 };
1759
1760 let attempt_counter = Arc::new(AtomicUsize::new(0));
1761 let mut handles = vec![];
1762
1763 for _ in 0..3 {
1765 let client_clone = client.clone();
1766 let counter_clone = attempt_counter.clone();
1767
1768 let handle = spawn(async move {
1769 client_clone
1770 .retry_with_backoff(|| {
1771 let counter = counter_clone.clone();
1772 async move {
1773 counter.fetch_add(1, Ordering::SeqCst);
1774 Ok::<String, Error>("success".to_string())
1775 }
1776 })
1777 .await
1778 });
1779 handles.push(handle);
1780 }
1781
1782 for handle in handles {
1784 let result = handle.await.unwrap();
1785 assert!(result.is_ok());
1786 }
1787
1788 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1790 }
1791
1792 fn test_client() -> Anthropic {
1793 Anthropic {
1794 api_key: "test".to_string(),
1795 client: ReqwestClient::new(),
1796 base_url: "http://localhost".to_string(),
1797 timeout: Duration::from_secs(1),
1798 max_retries: 0,
1799 throughput_ops_sec: 1.0 / 60.0,
1800 reserve_capacity: 1.0 / 60.0,
1801 cached_headers: Arc::new(HeaderMap::new()),
1802 default_betas: Vec::new(),
1803 }
1804 }
1805
1806 #[test]
1807 fn collect_betas_empty() {
1808 let client = test_client();
1809 let result = client.collect_betas(None, &[]);
1810 assert!(result.is_empty());
1811 }
1812
1813 #[test]
1814 fn collect_betas_client_defaults_only() {
1815 let client = test_client().with_default_betas(["alpha", "bravo"]);
1816 let result = client.collect_betas(None, &[]);
1817 assert_eq!(result, vec!["alpha", "bravo"]);
1818 }
1819
1820 #[test]
1821 fn collect_betas_request_only() {
1822 let client = test_client();
1823 let request = vec!["compact-2026-01-12".to_string()];
1824 let result = client.collect_betas(Some(&request), &[]);
1825 assert_eq!(result, vec!["compact-2026-01-12"]);
1826 }
1827
1828 #[test]
1829 fn collect_betas_auto_only() {
1830 let client = test_client();
1831 let result = client.collect_betas(None, &[STRUCTURED_OUTPUTS_BETA]);
1832 assert_eq!(result, vec![STRUCTURED_OUTPUTS_BETA]);
1833 }
1834
1835 #[test]
1836 fn collect_betas_merges_all_sources() {
1837 let client = test_client().with_default_betas(["default-beta"]);
1838 let request = vec!["request-beta".to_string()];
1839 let result = client.collect_betas(Some(&request), &["auto-beta"]);
1840 assert_eq!(result, vec!["default-beta", "request-beta", "auto-beta"]);
1841 }
1842
1843 #[test]
1844 fn collect_betas_deduplicates() {
1845 let client = test_client().with_default_betas(["shared-beta", "default-only"]);
1846 let request = vec!["shared-beta".to_string(), "request-only".to_string()];
1847 let result = client.collect_betas(Some(&request), &["shared-beta"]);
1848 assert_eq!(result, vec!["shared-beta", "default-only", "request-only"]);
1849 }
1850
1851 #[test]
1852 fn headers_with_betas_none_when_empty() {
1853 let client = test_client();
1854 assert!(client.headers_with_betas(&[]).is_none());
1855 }
1856
1857 #[test]
1858 fn headers_with_betas_joins_with_comma() {
1859 let client = test_client();
1860 let betas = vec!["alpha".to_string(), "bravo".to_string()];
1861 let headers = client.headers_with_betas(&betas).unwrap();
1862 assert_eq!(
1863 headers.get("anthropic-beta").unwrap().to_str().unwrap(),
1864 "alpha, bravo"
1865 );
1866 }
1867
1868 #[test]
1869 fn with_default_betas_builder() {
1870 let client = test_client().with_default_betas(["a", "b", "c"]);
1871 assert_eq!(client.default_betas, vec!["a", "b", "c"]);
1872 }
1873
1874 #[tokio::test]
1875 async fn create_message_batch_posts_expected_body_and_betas() {
1876 let response_body = r#"{
1877 "id": "msgbatch_123",
1878 "type": "message_batch",
1879 "processing_status": "in_progress",
1880 "request_counts": {
1881 "processing": 1,
1882 "succeeded": 0,
1883 "errored": 0,
1884 "canceled": 0,
1885 "expired": 0
1886 },
1887 "ended_at": null,
1888 "created_at": "2024-09-24T18:37:24Z",
1889 "expires_at": "2024-09-25T18:37:24Z",
1890 "cancel_initiated_at": null,
1891 "results_url": null
1892 }"#;
1893
1894 let base_url = start_test_server(move |mut socket| async move {
1895 let request = read_http_request_bytes(&mut socket).await;
1896 let (headers, body) = split_http_request(&request);
1897 assert_eq!(request_method(&headers), "POST");
1898 assert_eq!(request_target(&headers), "/v1/messages/batches");
1899 assert_eq!(
1900 header_value(&headers, "anthropic-beta"),
1901 Some("default-beta, batch-beta, request-beta")
1902 );
1903
1904 let body_json: Value = serde_json::from_str(&body).unwrap();
1905 assert_eq!(body_json["requests"][0]["custom_id"], "request-1");
1906 assert_eq!(
1907 body_json["requests"][0]["params"]["model"],
1908 "claude-haiku-4-5"
1909 );
1910 assert!(
1911 body_json["requests"][0]["params"].get("stream").is_none(),
1912 "stream must not be serialized inside batch params"
1913 );
1914 assert!(body_json.get("betas").is_none());
1915
1916 write_json_response(&mut socket, response_body).await;
1917 })
1918 .await;
1919
1920 let client = Anthropic::new(Some("test_key".to_string()))
1921 .unwrap()
1922 .with_base_url(base_url)
1923 .with_default_betas(["default-beta"]);
1924 let request_params = MessageCreateParams::new(
1925 16,
1926 vec![MessageParam::user("ping")],
1927 Model::Known(KnownModel::ClaudeHaiku45),
1928 )
1929 .with_beta("request-beta");
1930 let params = MessageBatchCreateParams::new(vec![MessageBatchCreateRequest::new(
1931 "request-1",
1932 request_params,
1933 )])
1934 .with_beta("batch-beta");
1935
1936 let batch = client.create_message_batch(params).await.unwrap();
1937 assert_eq!(batch.id, "msgbatch_123");
1938 }
1939
1940 #[tokio::test]
1941 async fn get_message_batch_uses_retrieve_endpoint() {
1942 let response_body = r#"{
1943 "id": "msgbatch_123",
1944 "type": "message_batch",
1945 "processing_status": "ended",
1946 "request_counts": {
1947 "processing": 0,
1948 "succeeded": 1,
1949 "errored": 0,
1950 "canceled": 0,
1951 "expired": 0
1952 },
1953 "ended_at": "2024-09-24T18:39:24Z",
1954 "created_at": "2024-09-24T18:37:24Z",
1955 "expires_at": "2024-09-25T18:37:24Z",
1956 "cancel_initiated_at": null,
1957 "results_url": "https://api.anthropic.com/results"
1958 }"#;
1959
1960 let base_url = start_test_server(move |mut socket| async move {
1961 let request = read_http_request_bytes(&mut socket).await;
1962 let (headers, body) = split_http_request(&request);
1963 assert_eq!(request_method(&headers), "GET");
1964 assert_eq!(
1965 request_target(&headers),
1966 "/v1/messages/batches/msgbatch_123"
1967 );
1968 assert!(body.is_empty());
1969 write_json_response(&mut socket, response_body).await;
1970 })
1971 .await;
1972
1973 let client = Anthropic::new(Some("test_key".to_string()))
1974 .unwrap()
1975 .with_base_url(base_url);
1976 let batch = client.get_message_batch("msgbatch_123").await.unwrap();
1977 assert_eq!(batch.id, "msgbatch_123");
1978 assert_eq!(batch.request_counts.succeeded, 1);
1979 }
1980
1981 #[tokio::test]
1982 async fn list_message_batches_sends_pagination_query_and_beta() {
1983 let response_body = r#"{
1984 "data": [],
1985 "has_more": false,
1986 "first_id": null,
1987 "last_id": null
1988 }"#;
1989
1990 let base_url = start_test_server(move |mut socket| async move {
1991 let request = read_http_request_bytes(&mut socket).await;
1992 let (headers, body) = split_http_request(&request);
1993 assert_eq!(request_method(&headers), "GET");
1994 assert_eq!(
1995 request_target(&headers),
1996 "/v1/messages/batches?after_id=msgbatch_a&limit=20"
1997 );
1998 assert_eq!(header_value(&headers, "anthropic-beta"), Some("list-beta"));
1999 assert!(body.is_empty());
2000 write_json_response(&mut socket, response_body).await;
2001 })
2002 .await;
2003
2004 let client = Anthropic::new(Some("test_key".to_string()))
2005 .unwrap()
2006 .with_base_url(base_url);
2007 let params = MessageBatchListParams::new()
2008 .with_after_id("msgbatch_a")
2009 .with_limit(20)
2010 .with_beta("list-beta");
2011 let page = client.list_message_batches(Some(params)).await.unwrap();
2012 assert!(page.data.is_empty());
2013 assert!(!page.has_more);
2014 }
2015
2016 #[tokio::test]
2017 async fn cancel_message_batch_posts_empty_body() {
2018 let response_body = r#"{
2019 "id": "msgbatch_123",
2020 "type": "message_batch",
2021 "processing_status": "canceling",
2022 "request_counts": {
2023 "processing": 1,
2024 "succeeded": 0,
2025 "errored": 0,
2026 "canceled": 0,
2027 "expired": 0
2028 },
2029 "ended_at": null,
2030 "created_at": "2024-09-24T18:37:24Z",
2031 "expires_at": "2024-09-25T18:37:24Z",
2032 "cancel_initiated_at": "2024-09-24T18:39:03Z",
2033 "results_url": null
2034 }"#;
2035
2036 let base_url = start_test_server(move |mut socket| async move {
2037 let request = read_http_request_bytes(&mut socket).await;
2038 let (headers, body) = split_http_request(&request);
2039 assert_eq!(request_method(&headers), "POST");
2040 assert_eq!(
2041 request_target(&headers),
2042 "/v1/messages/batches/msgbatch_123/cancel"
2043 );
2044 assert!(body.is_empty());
2045 write_json_response(&mut socket, response_body).await;
2046 })
2047 .await;
2048
2049 let client = Anthropic::new(Some("test_key".to_string()))
2050 .unwrap()
2051 .with_base_url(base_url);
2052 let batch = client.cancel_message_batch("msgbatch_123").await.unwrap();
2053 assert_eq!(batch.id, "msgbatch_123");
2054 }
2055
2056 #[tokio::test]
2057 async fn delete_message_batch_uses_delete_endpoint() {
2058 let response_body = r#"{
2059 "id": "msgbatch_123",
2060 "type": "message_batch_deleted"
2061 }"#;
2062
2063 let base_url = start_test_server(move |mut socket| async move {
2064 let request = read_http_request_bytes(&mut socket).await;
2065 let (headers, body) = split_http_request(&request);
2066 assert_eq!(request_method(&headers), "DELETE");
2067 assert_eq!(
2068 request_target(&headers),
2069 "/v1/messages/batches/msgbatch_123"
2070 );
2071 assert!(body.is_empty());
2072 write_json_response(&mut socket, response_body).await;
2073 })
2074 .await;
2075
2076 let client = Anthropic::new(Some("test_key".to_string()))
2077 .unwrap()
2078 .with_base_url(base_url);
2079 let deleted = client.delete_message_batch("msgbatch_123").await.unwrap();
2080 assert_eq!(deleted.r#type, "message_batch_deleted");
2081 }
2082
2083 #[tokio::test]
2084 async fn stream_message_batch_results_preserves_jsonl_order_without_trailing_newline() {
2085 let results_body = concat!(
2086 r#"{"custom_id":"second","result":{"type":"expired"}}"#,
2087 "\n",
2088 r#"{"custom_id":"first","result":{"type":"succeeded","message":{"id":"msg_123","type":"message","role":"assistant","model":"claude-haiku-4-5","content":[{"type":"text","text":"ok"}],"stop_reason":"end_turn","stop_sequence":null,"usage":{"input_tokens":1,"output_tokens":1}}}}"#
2089 );
2090
2091 let base_url = start_test_server(move |mut socket| async move {
2092 let request = read_http_request_bytes(&mut socket).await;
2093 let (headers, body) = split_http_request(&request);
2094 assert_eq!(request_method(&headers), "GET");
2095 assert_eq!(
2096 request_target(&headers),
2097 "/v1/messages/batches/msgbatch_123/results"
2098 );
2099 assert!(body.is_empty());
2100
2101 let response_headers = format!(
2102 "HTTP/1.1 200 OK\r\n\
2103Content-Type: application/jsonl\r\n\
2104Content-Length: {}\r\n\
2105Connection: close\r\n\r\n",
2106 results_body.len()
2107 );
2108 socket.write_all(response_headers.as_bytes()).await.unwrap();
2109 let split_at = results_body.find('\n').unwrap() + 1;
2110 socket
2111 .write_all(&results_body.as_bytes()[..split_at])
2112 .await
2113 .unwrap();
2114 socket
2115 .write_all(&results_body.as_bytes()[split_at..])
2116 .await
2117 .unwrap();
2118 socket.shutdown().await.unwrap();
2119 })
2120 .await;
2121
2122 let client = Anthropic::new(Some("test_key".to_string()))
2123 .unwrap()
2124 .with_base_url(base_url);
2125 let stream = client
2126 .stream_message_batch_results("msgbatch_123")
2127 .await
2128 .unwrap();
2129 let mut stream = std::pin::pin!(stream);
2130
2131 let first = stream.next().await.unwrap().unwrap();
2132 assert_eq!(first.custom_id, "second");
2133 assert!(matches!(first.result, MessageBatchResultVariant::Expired));
2134
2135 let second = stream.next().await.unwrap().unwrap();
2136 assert_eq!(second.custom_id, "first");
2137 assert!(matches!(
2138 second.result,
2139 MessageBatchResultVariant::Succeeded { .. }
2140 ));
2141
2142 assert!(stream.next().await.is_none());
2143 }
2144
2145 #[tokio::test]
2146 async fn streaming_timeout_is_inactivity_based() {
2147 let base_url = start_test_server(|mut socket| async move {
2148 read_http_request(&mut socket).await;
2149 socket
2150 .write_all(
2151 b"HTTP/1.1 200 OK\r\n\
2152Content-Type: text/event-stream\r\n\
2153Cache-Control: no-cache\r\n\
2154Connection: close\r\n\r\n",
2155 )
2156 .await
2157 .unwrap();
2158
2159 for chunk in [
2160 b"event: ping\n".as_slice(),
2161 b"data: {}\n".as_slice(),
2162 b"\n".as_slice(),
2163 ] {
2164 socket.write_all(chunk).await.unwrap();
2165 socket.flush().await.unwrap();
2166 tokio::time::sleep(Duration::from_millis(40)).await;
2167 }
2168 })
2169 .await;
2170
2171 let client = Anthropic::new(Some("test_key".to_string()))
2172 .unwrap()
2173 .with_base_url(base_url)
2174 .with_timeout(Duration::from_millis(75))
2175 .unwrap();
2176 let params = MessageCreateParams::new_streaming(
2177 16,
2178 vec![MessageParam::user("ping")],
2179 Model::Known(KnownModel::ClaudeHaiku45),
2180 );
2181
2182 let mut stream = std::pin::pin!(client.stream(¶ms).await.unwrap());
2183 let first = stream.next().await.unwrap().unwrap();
2184 assert_eq!(first, MessageStreamEvent::Ping);
2185 }
2186
2187 #[tokio::test]
2188 async fn streaming_stall_reports_timeout_error() {
2189 let base_url = start_test_server(|mut socket| async move {
2190 read_http_request(&mut socket).await;
2191 socket
2192 .write_all(
2193 b"HTTP/1.1 200 OK\r\n\
2194Content-Type: text/event-stream\r\n\
2195Cache-Control: no-cache\r\n\
2196Connection: close\r\n\r\n\
2197event: ping\n",
2198 )
2199 .await
2200 .unwrap();
2201 socket.flush().await.unwrap();
2202 tokio::time::sleep(Duration::from_millis(120)).await;
2203 socket.write_all(b"data: {}\n\n").await.unwrap();
2204 socket.flush().await.unwrap();
2205 })
2206 .await;
2207
2208 let client = Anthropic::new(Some("test_key".to_string()))
2209 .unwrap()
2210 .with_base_url(base_url)
2211 .with_timeout(Duration::from_millis(50))
2212 .unwrap();
2213 let params = MessageCreateParams::new_streaming(
2214 16,
2215 vec![MessageParam::user("ping")],
2216 Model::Known(KnownModel::ClaudeHaiku45),
2217 );
2218
2219 let mut stream = std::pin::pin!(client.stream(¶ms).await.unwrap());
2220 let err = stream.next().await.unwrap().unwrap_err();
2221 assert!(matches!(err, Error::Timeout { .. }));
2222 assert!(err.to_string().contains("operation timed out"));
2223 }
2224
2225 #[tokio::test]
2226 async fn non_streaming_body_stall_reports_timeout_error() {
2227 let base_url = start_test_server(|mut socket| async move {
2228 read_http_request(&mut socket).await;
2229 let body_prefix = b"{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude-haiku-4-5-20251001\",\"content\":[{\"type\":\"text\",\"text\":\"hel";
2230 let body_suffix = b"lo\"}],\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":1,\"output_tokens\":1}}";
2231 let headers = format!(
2232 "HTTP/1.1 200 OK\r\n\
2233Content-Type: application/json\r\n\
2234Content-Length: {}\r\n\
2235Connection: close\r\n\r\n",
2236 body_prefix.len() + body_suffix.len()
2237 );
2238 socket
2239 .write_all(headers.as_bytes())
2240 .await
2241 .unwrap();
2242 socket.write_all(body_prefix).await.unwrap();
2243 socket.flush().await.unwrap();
2244 tokio::time::sleep(Duration::from_millis(120)).await;
2245 socket.write_all(body_suffix).await.unwrap();
2246 socket.flush().await.unwrap();
2247 socket.shutdown().await.unwrap();
2248 })
2249 .await;
2250
2251 let client = Anthropic::new(Some("test_key".to_string()))
2252 .unwrap()
2253 .with_base_url(base_url)
2254 .with_timeout(Duration::from_millis(50))
2255 .unwrap()
2256 .with_max_retries(0);
2257 let params = MessageCreateParams::new(
2258 16,
2259 vec![MessageParam::user("ping")],
2260 Model::Known(KnownModel::ClaudeHaiku45),
2261 );
2262
2263 let err = client.send(params).await.unwrap_err();
2264 assert!(matches!(err, Error::Timeout { .. }), "{err:?}");
2265 }
2266}