1use crate::{
2 auth::AuthConfig,
3 error::{ApiErrorResponse, RainyError, Result},
4 models::*,
5 retry::{retry_with_backoff, RetryConfig},
6};
7use eventsource_stream::Eventsource;
8use futures::{Stream, StreamExt};
9use reqwest::{
10 header::{HeaderMap, HeaderValue, AUTHORIZATION, USER_AGENT},
11 Client, Response,
12};
13use secrecy::ExposeSecret;
14use serde::Deserialize;
15use std::pin::Pin;
16use std::time::Instant;
17
18#[cfg(feature = "rate-limiting")]
19use governor::{
20 clock::DefaultClock,
21 state::{InMemoryState, NotKeyed},
22 Quota, RateLimiter,
23};
24
25pub struct RainyClient {
50 client: Client,
52 auth_config: AuthConfig,
54 retry_config: RetryConfig,
56
57 #[cfg(feature = "rate-limiting")]
60 rate_limiter: Option<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
61}
62
63impl RainyClient {
64 pub(crate) fn root_url(&self, path: &str) -> String {
65 let normalized = if path.starts_with('/') {
66 path.to_string()
67 } else {
68 format!("/{path}")
69 };
70 format!(
71 "{}{}",
72 self.auth_config.base_url.trim_end_matches('/'),
73 normalized
74 )
75 }
76
77 pub(crate) fn api_v1_url(&self, path: &str) -> String {
78 let normalized = if path.starts_with('/') {
79 path.to_string()
80 } else {
81 format!("/{path}")
82 };
83 format!(
84 "{}/api/v1{}",
85 self.auth_config.base_url.trim_end_matches('/'),
86 normalized
87 )
88 }
89
90 pub fn with_api_key(api_key: impl Into<String>) -> Result<Self> {
103 let auth_config = AuthConfig::new(api_key);
104 Self::with_config(auth_config)
105 }
106
107 pub fn with_config(auth_config: AuthConfig) -> Result<Self> {
119 auth_config.validate()?;
121
122 let mut headers = HeaderMap::new();
124 headers.insert(
125 AUTHORIZATION,
126 HeaderValue::from_str(&format!("Bearer {}", auth_config.api_key.expose_secret()))
127 .map_err(|e| RainyError::Authentication {
128 code: "INVALID_API_KEY".to_string(),
129 message: format!("Invalid API key format: {}", e),
130 retryable: false,
131 })?,
132 );
133 headers.insert(
134 USER_AGENT,
135 HeaderValue::from_str(&auth_config.user_agent).map_err(|e| RainyError::Network {
136 message: format!("Invalid user agent: {}", e),
137 retryable: false,
138 source_error: None,
139 })?,
140 );
141
142 let client = Client::builder()
143 .use_rustls_tls()
144 .min_tls_version(reqwest::tls::Version::TLS_1_2)
145 .https_only(true)
146 .timeout(auth_config.timeout())
147 .default_headers(headers)
148 .build()
149 .map_err(|e| RainyError::Network {
150 message: format!("Failed to create HTTP client: {}", e),
151 retryable: false,
152 source_error: Some(e.to_string()),
153 })?;
154
155 let retry_config = RetryConfig::new(auth_config.max_retries);
156
157 #[cfg(feature = "rate-limiting")]
158 let rate_limiter = Some(RateLimiter::direct(Quota::per_second(
159 std::num::NonZeroU32::new(10).unwrap(),
160 )));
161
162 Ok(Self {
163 client,
164 auth_config,
165 retry_config,
166 #[cfg(feature = "rate-limiting")]
167 rate_limiter,
168 })
169 }
170
171 pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
183 self.retry_config = retry_config;
184 self
185 }
186
187 pub async fn get_available_models(&self) -> Result<AvailableModels> {
193 #[derive(Deserialize)]
194 struct ModelListItem {
195 id: String,
196 }
197 #[derive(Deserialize)]
198 struct ModelsData {
199 data: Vec<ModelListItem>,
200 }
201 #[derive(Deserialize)]
202 struct Envelope {
203 data: ModelsData,
204 }
205
206 let url = self.api_v1_url("/models");
207
208 let operation = || async {
209 let response = self.client.get(&url).send().await?;
210 let envelope: Envelope = self.handle_response(response).await?;
211
212 let mut providers = std::collections::HashMap::<String, Vec<String>>::new();
213 for item in envelope.data.data {
214 let provider = item
215 .id
216 .split_once('/')
217 .map(|(p, _)| p.to_string())
218 .unwrap_or_else(|| "rainy".to_string());
219 providers.entry(provider).or_default().push(item.id);
220 }
221
222 let total_models = providers.values().map(std::vec::Vec::len).sum();
223 let mut active_providers = providers.keys().cloned().collect::<Vec<_>>();
224 active_providers.sort();
225
226 Ok(AvailableModels {
227 providers,
228 total_models,
229 active_providers,
230 })
231 };
232
233 if self.auth_config.enable_retry {
234 retry_with_backoff(&self.retry_config, operation).await
235 } else {
236 operation().await
237 }
238 }
239
240 pub async fn chat_completion(
251 &self,
252 request: ChatCompletionRequest,
253 ) -> Result<(ChatCompletionResponse, RequestMetadata)> {
254 #[cfg(feature = "rate-limiting")]
255 if let Some(ref limiter) = self.rate_limiter {
256 limiter.until_ready().await;
257 }
258
259 let url = self.api_v1_url("/chat/completions");
260 let start_time = Instant::now();
261
262 let operation = || async {
263 let response = self.client.post(&url).json(&request).send().await?;
264
265 let metadata = self.extract_metadata(&response, start_time);
266 let chat_response: ChatCompletionResponse = self.handle_response(response).await?;
267
268 Ok((chat_response, metadata))
269 };
270
271 if self.auth_config.enable_retry {
272 retry_with_backoff(&self.retry_config, operation).await
273 } else {
274 operation().await
275 }
276 }
277
278 pub async fn chat_completion_stream(
288 &self,
289 mut request: ChatCompletionRequest,
290 ) -> Result<Pin<Box<dyn Stream<Item = Result<ChatCompletionChunk>> + Send>>> {
291 request.stream = Some(true);
293
294 #[cfg(feature = "rate-limiting")]
295 if let Some(ref limiter) = self.rate_limiter {
296 limiter.until_ready().await;
297 }
298
299 let url = self.api_v1_url("/chat/completions");
300
301 let operation = || async {
303 let response = self
304 .client
305 .post(&url)
306 .json(&request)
307 .send()
308 .await
309 .map_err(|e| RainyError::Network {
310 message: format!("Failed to send request: {}", e),
311 retryable: true,
312 source_error: Some(e.to_string()),
313 })?;
314
315 self.handle_stream_response(response).await
316 };
317
318 if self.auth_config.enable_retry {
319 retry_with_backoff(&self.retry_config, operation).await
320 } else {
321 operation().await
322 }
323 }
324
325 pub async fn create_response(
327 &self,
328 request: ResponsesRequest,
329 ) -> Result<(ResponsesApiResponse, RequestMetadata)> {
330 #[cfg(feature = "rate-limiting")]
331 if let Some(ref limiter) = self.rate_limiter {
332 limiter.until_ready().await;
333 }
334
335 let url = self.api_v1_url("/responses");
336 let start_time = Instant::now();
337
338 let operation = || async {
339 let response = self.client.post(&url).json(&request).send().await?;
340 let metadata = self.extract_metadata(&response, start_time);
341 let api_response: ResponsesApiResponse = self.handle_response(response).await?;
342 Ok((api_response, metadata))
343 };
344
345 if self.auth_config.enable_retry {
346 retry_with_backoff(&self.retry_config, operation).await
347 } else {
348 operation().await
349 }
350 }
351
352 pub async fn create_response_envelope(
354 &self,
355 request: ResponsesRequest,
356 ) -> Result<(RainyEnvelope<ResponsesApiResponse>, RequestMetadata)> {
357 #[cfg(feature = "rate-limiting")]
358 if let Some(ref limiter) = self.rate_limiter {
359 limiter.until_ready().await;
360 }
361
362 let url = self.api_v1_url("/responses");
363 let start_time = Instant::now();
364
365 let operation = || async {
366 let response = self
367 .client
368 .post(&url)
369 .header("X-Rainy-Response-Mode", "envelope")
370 .json(&request)
371 .send()
372 .await?;
373 let metadata = self.extract_metadata(&response, start_time);
374 let api_response: RainyEnvelope<ResponsesApiResponse> =
375 self.handle_response(response).await?;
376 Ok((api_response, metadata))
377 };
378
379 if self.auth_config.enable_retry {
380 retry_with_backoff(&self.retry_config, operation).await
381 } else {
382 operation().await
383 }
384 }
385
386 pub async fn create_response_stream(
388 &self,
389 mut request: ResponsesRequest,
390 ) -> Result<Pin<Box<dyn Stream<Item = Result<ResponsesStreamEvent>> + Send>>> {
391 request.stream = Some(true);
392
393 #[cfg(feature = "rate-limiting")]
394 if let Some(ref limiter) = self.rate_limiter {
395 limiter.until_ready().await;
396 }
397
398 let url = self.api_v1_url("/responses");
399
400 let operation = || async {
401 let response = self
402 .client
403 .post(&url)
404 .json(&request)
405 .send()
406 .await
407 .map_err(|e| RainyError::Network {
408 message: format!("Failed to send request: {}", e),
409 retryable: true,
410 source_error: Some(e.to_string()),
411 })?;
412
413 let status = response.status();
414 if !status.is_success() {
415 return Err(self
416 .handle_response::<ResponsesApiResponse>(response)
417 .await
418 .err()
419 .unwrap());
420 }
421
422 let stream = response
423 .bytes_stream()
424 .eventsource()
425 .filter_map(|event| async move {
426 match event {
427 Ok(event) => {
428 if event.data.trim() == "[DONE]" {
429 return None;
430 }
431
432 match serde_json::from_str::<ResponsesStreamEvent>(&event.data) {
433 Ok(payload) => Some(Ok(payload)),
434 Err(e) => Some(Err(RainyError::Serialization {
435 message: e.to_string(),
436 source_error: Some(e.to_string()),
437 })),
438 }
439 }
440 Err(e) => Some(Err(RainyError::Network {
441 message: format!("SSE parsing error: {e}"),
442 retryable: true,
443 source_error: Some(e.to_string()),
444 })),
445 }
446 });
447
448 Ok(Box::pin(stream)
449 as Pin<
450 Box<dyn Stream<Item = Result<ResponsesStreamEvent>> + Send>,
451 >)
452 };
453
454 if self.auth_config.enable_retry {
455 retry_with_backoff(&self.retry_config, operation).await
456 } else {
457 operation().await
458 }
459 }
460
461 pub async fn get_models_catalog(&self) -> Result<Vec<ModelCatalogItem>> {
463 #[derive(Deserialize)]
464 struct ModelsCatalogData {
465 data: Vec<ModelCatalogItem>,
466 }
467 #[derive(Deserialize)]
468 struct Envelope {
469 data: ModelsCatalogData,
470 }
471
472 let url = self.api_v1_url("/models/catalog");
473 let operation = || async {
474 let response = self.client.get(&url).send().await?;
475 let envelope: Envelope = self.handle_response(response).await?;
476 Ok(envelope.data.data)
477 };
478
479 if self.auth_config.enable_retry {
480 retry_with_backoff(&self.retry_config, operation).await
481 } else {
482 operation().await
483 }
484 }
485
486 pub async fn select_models(
488 &self,
489 criteria: ModelSelectionCriteria,
490 ) -> Result<Vec<ModelCatalogItem>> {
491 let catalog = self.get_models_catalog().await?;
492 Ok(crate::models::select_models(&catalog, &criteria))
493 }
494
495 pub fn build_reasoning_config(
497 &self,
498 model: &ModelCatalogItem,
499 preference: &ReasoningPreference,
500 ) -> Option<serde_json::Value> {
501 crate::models::build_reasoning_config(model, preference)
502 }
503
504 pub async fn simple_chat(
518 &self,
519 model: impl Into<String>,
520 prompt: impl Into<String>,
521 ) -> Result<String> {
522 let request = ChatCompletionRequest::new(model, vec![ChatMessage::user(prompt)]);
523
524 let (response, _) = self.chat_completion(request).await?;
525
526 Ok(response
527 .choices
528 .into_iter()
529 .next()
530 .map(|choice| choice.message.content)
531 .unwrap_or_default())
532 }
533
534 pub(crate) async fn handle_response<T>(&self, response: Response) -> Result<T>
539 where
540 T: serde::de::DeserializeOwned,
541 {
542 let status = response.status();
543 let headers = response.headers().clone();
544 let request_id = headers
545 .get("x-request-id")
546 .and_then(|v| v.to_str().ok())
547 .map(String::from);
548
549 if status.is_success() {
550 let text = response.text().await?;
551 serde_json::from_str(&text).map_err(|e| RainyError::Serialization {
552 message: format!("Failed to parse response: {}", e),
553 source_error: Some(e.to_string()),
554 })
555 } else {
556 let text = response.text().await.unwrap_or_default();
557
558 if let Ok(error_response) = serde_json::from_str::<ApiErrorResponse>(&text) {
560 let error = error_response.error;
561 self.map_api_error(error, status.as_u16(), request_id)
562 } else {
563 Err(RainyError::Api {
565 code: status.canonical_reason().unwrap_or("UNKNOWN").to_string(),
566 message: if text.is_empty() {
567 format!("HTTP {}", status.as_u16())
568 } else {
569 text
570 },
571 status_code: status.as_u16(),
572 retryable: status.is_server_error(),
573 request_id,
574 })
575 }
576 }
577 }
578
579 pub(crate) async fn handle_stream_response(
581 &self,
582 response: Response,
583 ) -> Result<Pin<Box<dyn Stream<Item = Result<ChatCompletionChunk>> + Send>>> {
584 let status = response.status();
585 let request_id = response
586 .headers()
587 .get("x-request-id")
588 .and_then(|v| v.to_str().ok())
589 .map(String::from);
590
591 if status.is_success() {
592 let stream = response
593 .bytes_stream()
594 .eventsource()
595 .map(move |event| match event {
596 Ok(event) => {
597 if event.data == "[DONE]" {
598 return None;
599 }
600
601 match serde_json::from_str::<ChatCompletionChunk>(&event.data) {
602 Ok(chunk) => Some(Ok(chunk)),
603 Err(e) => Some(Err(RainyError::Serialization {
604 message: format!("Failed to parse stream chunk: {}", e),
605 source_error: Some(e.to_string()),
606 })),
607 }
608 }
609 Err(e) => Some(Err(RainyError::Network {
610 message: format!("Stream error: {}", e),
611 retryable: true,
612 source_error: Some(e.to_string()),
613 })),
614 })
615 .take_while(|x| futures::future::ready(x.is_some()))
616 .map(|x| x.unwrap());
617
618 Ok(Box::pin(stream))
619 } else {
620 let text = response.text().await.unwrap_or_default();
621
622 if let Ok(error_response) = serde_json::from_str::<ApiErrorResponse>(&text) {
624 let error = error_response.error;
625 self.map_api_error(error, status.as_u16(), request_id)
626 } else {
627 Err(RainyError::Api {
628 code: status.canonical_reason().unwrap_or("UNKNOWN").to_string(),
629 message: if text.is_empty() {
630 format!("HTTP {}", status.as_u16())
631 } else {
632 text
633 },
634 status_code: status.as_u16(),
635 retryable: status.is_server_error(),
636 request_id,
637 })
638 }
639 }
640 }
641
642 fn extract_metadata(&self, response: &Response, start_time: Instant) -> RequestMetadata {
646 let headers = response.headers();
647
648 RequestMetadata {
649 response_time: Some(start_time.elapsed().as_millis() as u64),
650 provider: headers
651 .get("x-provider")
652 .and_then(|v| v.to_str().ok())
653 .map(String::from),
654 tokens_used: headers
655 .get("x-tokens-used")
656 .and_then(|v| v.to_str().ok())
657 .and_then(|s| s.parse().ok()),
658 credits_used: headers
659 .get("x-credits-used")
660 .and_then(|v| v.to_str().ok())
661 .and_then(|s| s.parse().ok()),
662 credits_remaining: headers
663 .get("x-credits-remaining")
664 .and_then(|v| v.to_str().ok())
665 .and_then(|s| s.parse().ok()),
666 request_id: headers
667 .get("x-request-id")
668 .and_then(|v| v.to_str().ok())
669 .map(String::from),
670 compat_warnings: headers
671 .get("x-rainy-compat-warnings")
672 .and_then(|v| v.to_str().ok())
673 .and_then(|s| s.parse().ok()),
674 response_mode: headers
675 .get("x-rainy-response-mode")
676 .and_then(|v| v.to_str().ok())
677 .map(String::from),
678 billing_plan: headers
679 .get("x-rainy-billing-plan")
680 .and_then(|v| v.to_str().ok())
681 .map(String::from),
682 rainy_credits_charged: headers
683 .get("x-rainy-credits-charged")
684 .and_then(|v| v.to_str().ok())
685 .and_then(|s| s.parse().ok()),
686 rainy_markup_percent: headers
687 .get("x-rainy-markup-percent")
688 .and_then(|v| v.to_str().ok())
689 .and_then(|s| s.parse().ok()),
690 rainy_daily_credits_remaining: headers
691 .get("x-rainy-daily-credits-remaining")
692 .and_then(|v| v.to_str().ok())
693 .map(String::from),
694 }
695 }
696
697 fn map_api_error<T>(
701 &self,
702 error: crate::error::ApiErrorDetails,
703 status_code: u16,
704 request_id: Option<String>,
705 ) -> Result<T> {
706 let retryable = error.retryable.unwrap_or(status_code >= 500);
707
708 let rainy_error = match error.code.as_str() {
709 "INVALID_API_KEY" | "EXPIRED_API_KEY" => RainyError::Authentication {
710 code: error.code,
711 message: error.message,
712 retryable: false,
713 },
714 "INSUFFICIENT_CREDITS" => {
715 let (current_credits, estimated_cost, reset_date) =
717 if let Some(details) = error.details {
718 let current = details
719 .get("current_credits")
720 .and_then(|v| v.as_f64())
721 .unwrap_or(0.0);
722 let cost = details
723 .get("estimated_cost")
724 .and_then(|v| v.as_f64())
725 .unwrap_or(0.0);
726 let reset = details
727 .get("reset_date")
728 .and_then(|v| v.as_str())
729 .map(String::from);
730 (current, cost, reset)
731 } else {
732 (0.0, 0.0, None)
733 };
734
735 RainyError::InsufficientCredits {
736 code: error.code,
737 message: error.message,
738 current_credits,
739 estimated_cost,
740 reset_date,
741 }
742 }
743 "RATE_LIMIT_EXCEEDED" => {
744 let retry_after = error
745 .details
746 .as_ref()
747 .and_then(|d| d.get("retry_after"))
748 .and_then(|v| v.as_u64());
749
750 RainyError::RateLimit {
751 code: error.code,
752 message: error.message,
753 retry_after,
754 current_usage: None,
755 }
756 }
757 "INVALID_REQUEST" | "MISSING_REQUIRED_FIELD" | "INVALID_MODEL" => {
758 RainyError::InvalidRequest {
759 code: error.code,
760 message: error.message,
761 details: error.details,
762 }
763 }
764 "PROVIDER_ERROR" | "PROVIDER_UNAVAILABLE" => {
765 let provider = error
766 .details
767 .as_ref()
768 .and_then(|d| d.get("provider"))
769 .and_then(|v| v.as_str())
770 .unwrap_or("unknown")
771 .to_string();
772
773 RainyError::Provider {
774 code: error.code,
775 message: error.message,
776 provider,
777 retryable,
778 }
779 }
780 _ => RainyError::Api {
781 code: error.code,
782 message: error.message,
783 status_code,
784 retryable,
785 request_id: request_id.clone(),
786 },
787 };
788
789 Err(rainy_error)
790 }
791
792 pub fn auth_config(&self) -> &AuthConfig {
794 &self.auth_config
795 }
796
797 pub fn base_url(&self) -> &str {
799 &self.auth_config.base_url
800 }
801
802 pub(crate) fn http_client(&self) -> &Client {
806 &self.client
807 }
808
809 pub async fn list_available_models(&self) -> Result<AvailableModels> {
834 self.get_available_models().await
835 }
836
837 #[cfg(feature = "cowork")]
845 #[deprecated(
846 note = "Cowork endpoints are legacy and not supported by Rainy API v3. Migrate to v3 session/org endpoints."
847 )]
848 pub async fn get_cowork_profile(&self) -> Result<crate::cowork::CoworkProfile> {
849 let url = self.api_v1_url("/cowork/profile");
850
851 let operation = || async {
852 let response = self.client.get(&url).send().await?;
853 self.handle_response(response).await
854 };
855
856 if self.auth_config.enable_retry {
857 retry_with_backoff(&self.retry_config, operation).await
858 } else {
859 operation().await
860 }
861 }
862
863 pub(crate) async fn make_request<T: serde::de::DeserializeOwned>(
869 &self,
870 method: reqwest::Method,
871 endpoint: &str,
872 body: Option<serde_json::Value>,
873 ) -> Result<T> {
874 #[cfg(feature = "rate-limiting")]
875 if let Some(ref limiter) = self.rate_limiter {
876 limiter.until_ready().await;
877 }
878
879 let url = self.api_v1_url(endpoint);
880 let headers = self.auth_config.build_headers()?;
881
882 let mut request = self.client.request(method, &url).headers(headers);
883
884 if let Some(body) = body {
885 request = request.json(&body);
886 }
887
888 let response = request.send().await?;
889 self.handle_response(response).await
890 }
891}
892
893impl std::fmt::Debug for RainyClient {
894 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895 f.debug_struct("RainyClient")
896 .field("base_url", &self.auth_config.base_url)
897 .field("timeout", &self.auth_config.timeout_seconds)
898 .field("max_retries", &self.retry_config.max_retries)
899 .finish()
900 }
901}