use crate::{
auth::AuthConfig,
error::{ApiErrorResponse, RainyError, Result},
models::*,
retry::{retry_with_backoff, RetryConfig},
};
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt};
use reqwest::{
header::{HeaderMap, HeaderValue, AUTHORIZATION, USER_AGENT},
Client, Response,
};
use secrecy::ExposeSecret;
use serde::Deserialize;
use std::pin::Pin;
use std::time::Instant;
#[cfg(feature = "rate-limiting")]
use governor::{
clock::DefaultClock,
state::{InMemoryState, NotKeyed},
Quota, RateLimiter,
};
pub struct RainyClient {
client: Client,
auth_config: AuthConfig,
retry_config: RetryConfig,
#[cfg(feature = "rate-limiting")]
rate_limiter: Option<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
}
impl RainyClient {
pub(crate) fn root_url(&self, path: &str) -> String {
let normalized = if path.starts_with('/') {
path.to_string()
} else {
format!("/{path}")
};
format!(
"{}{}",
self.auth_config.base_url.trim_end_matches('/'),
normalized
)
}
pub(crate) fn api_v1_url(&self, path: &str) -> String {
let normalized = if path.starts_with('/') {
path.to_string()
} else {
format!("/{path}")
};
format!(
"{}/api/v1{}",
self.auth_config.base_url.trim_end_matches('/'),
normalized
)
}
pub fn with_api_key(api_key: impl Into<String>) -> Result<Self> {
let auth_config = AuthConfig::new(api_key);
Self::with_config(auth_config)
}
pub fn with_config(auth_config: AuthConfig) -> Result<Self> {
auth_config.validate()?;
let mut headers = HeaderMap::new();
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {}", auth_config.api_key.expose_secret()))
.map_err(|e| RainyError::Authentication {
code: "INVALID_API_KEY".to_string(),
message: format!("Invalid API key format: {}", e),
retryable: false,
})?,
);
headers.insert(
USER_AGENT,
HeaderValue::from_str(&auth_config.user_agent).map_err(|e| RainyError::Network {
message: format!("Invalid user agent: {}", e),
retryable: false,
source_error: None,
})?,
);
let client = Client::builder()
.use_rustls_tls()
.min_tls_version(reqwest::tls::Version::TLS_1_2)
.https_only(true)
.timeout(auth_config.timeout())
.default_headers(headers)
.build()
.map_err(|e| RainyError::Network {
message: format!("Failed to create HTTP client: {}", e),
retryable: false,
source_error: Some(e.to_string()),
})?;
let retry_config = RetryConfig::new(auth_config.max_retries);
#[cfg(feature = "rate-limiting")]
let rate_limiter = Some(RateLimiter::direct(Quota::per_second(
std::num::NonZeroU32::new(10).unwrap(),
)));
Ok(Self {
client,
auth_config,
retry_config,
#[cfg(feature = "rate-limiting")]
rate_limiter,
})
}
pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
self.retry_config = retry_config;
self
}
pub async fn get_available_models(&self) -> Result<AvailableModels> {
#[derive(Deserialize)]
struct ModelListItem {
id: String,
}
#[derive(Deserialize)]
struct ModelsData {
data: Vec<ModelListItem>,
}
#[derive(Deserialize)]
struct Envelope {
data: ModelsData,
}
let url = self.api_v1_url("/models");
let operation = || async {
let response = self.client.get(&url).send().await?;
let envelope: Envelope = self.handle_response(response).await?;
let mut providers = std::collections::HashMap::<String, Vec<String>>::new();
for item in envelope.data.data {
let provider = item
.id
.split_once('/')
.map(|(p, _)| p.to_string())
.unwrap_or_else(|| "rainy".to_string());
providers.entry(provider).or_default().push(item.id);
}
let total_models = providers.values().map(std::vec::Vec::len).sum();
let mut active_providers = providers.keys().cloned().collect::<Vec<_>>();
active_providers.sort();
Ok(AvailableModels {
providers,
total_models,
active_providers,
})
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn chat_completion(
&self,
request: ChatCompletionRequest,
) -> Result<(ChatCompletionResponse, RequestMetadata)> {
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/chat/completions");
let start_time = Instant::now();
let operation = || async {
let response = self.client.post(&url).json(&request).send().await?;
let metadata = self.extract_metadata(&response, start_time);
let chat_response: ChatCompletionResponse = self.handle_response(response).await?;
Ok((chat_response, metadata))
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn chat_completion_envelope(
&self,
request: ChatCompletionRequest,
) -> Result<(RainyEnvelope<ChatCompletionResponse>, RequestMetadata)> {
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/chat/completions");
let start_time = Instant::now();
let operation = || async {
let response = self
.client
.post(&url)
.header("X-Rainy-Response-Mode", "envelope")
.json(&request)
.send()
.await?;
let metadata = self.extract_metadata(&response, start_time);
let chat_response: RainyEnvelope<ChatCompletionResponse> =
self.handle_response(response).await?;
Ok((chat_response, metadata))
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn openai_chat_completion_envelope(
&self,
request: OpenAIChatCompletionRequest,
) -> Result<(RainyEnvelope<OpenAIChatCompletionResponse>, RequestMetadata)> {
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/chat/completions");
let start_time = Instant::now();
let operation = || async {
let response = self
.client
.post(&url)
.header("X-Rainy-Response-Mode", "envelope")
.json(&request)
.send()
.await?;
let metadata = self.extract_metadata(&response, start_time);
let chat_response: RainyEnvelope<OpenAIChatCompletionResponse> =
self.handle_response(response).await?;
Ok((chat_response, metadata))
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn chat_completion_stream(
&self,
mut request: ChatCompletionRequest,
) -> Result<Pin<Box<dyn Stream<Item = Result<ChatCompletionStreamResponse>> + Send>>> {
request.stream = Some(true);
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/chat/completions");
let operation = || async {
let response = self
.client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| RainyError::Network {
message: format!("Failed to send request: {}", e),
retryable: true,
source_error: Some(e.to_string()),
})?;
let events = self.handle_chat_stream_response(response).await?;
let stream = events.filter_map(|event| async move {
match event {
Ok(ChatStreamEvent::Chunk(chunk)) => Some(Ok(chunk)),
Ok(ChatStreamEvent::Billing(_)) | Ok(ChatStreamEvent::Raw(_)) => None,
Err(error) => Some(Err(error)),
}
});
Ok(Box::pin(stream)
as Pin<
Box<dyn Stream<Item = Result<ChatCompletionStreamResponse>> + Send>,
>)
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn create_response(
&self,
request: ResponsesRequest,
) -> Result<(ResponsesApiResponse, RequestMetadata)> {
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/responses");
let start_time = Instant::now();
let operation = || async {
let response = self.client.post(&url).json(&request).send().await?;
let metadata = self.extract_metadata(&response, start_time);
let api_response: ResponsesApiResponse = self.handle_response(response).await?;
Ok((api_response, metadata))
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn create_response_envelope(
&self,
request: ResponsesRequest,
) -> Result<(RainyEnvelope<ResponsesApiResponse>, RequestMetadata)> {
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/responses");
let start_time = Instant::now();
let operation = || async {
let response = self
.client
.post(&url)
.header("X-Rainy-Response-Mode", "envelope")
.json(&request)
.send()
.await?;
let metadata = self.extract_metadata(&response, start_time);
let api_response: RainyEnvelope<ResponsesApiResponse> =
self.handle_response(response).await?;
Ok((api_response, metadata))
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn create_response_stream(
&self,
mut request: ResponsesRequest,
) -> Result<Pin<Box<dyn Stream<Item = Result<ResponsesStreamEvent>> + Send>>> {
request.stream = Some(true);
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/responses");
let operation = || async {
let response = self
.client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| RainyError::Network {
message: format!("Failed to send request: {}", e),
retryable: true,
source_error: Some(e.to_string()),
})?;
self.handle_stream_response(response).await
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn chat_completion_stream_events(
&self,
mut request: ChatCompletionRequest,
) -> Result<Pin<Box<dyn Stream<Item = Result<ChatStreamEvent>> + Send>>> {
request.stream = Some(true);
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url("/chat/completions");
let operation = || async {
let response = self
.client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| RainyError::Network {
message: format!("Failed to send request: {}", e),
retryable: true,
source_error: Some(e.to_string()),
})?;
self.handle_chat_stream_response(response).await
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn get_models_catalog(&self) -> Result<Vec<ModelCatalogItem>> {
#[derive(Deserialize)]
struct ModelsCatalogData {
data: Vec<ModelCatalogItem>,
}
#[derive(Deserialize)]
struct Envelope {
data: ModelsCatalogData,
}
let url = self.api_v1_url("/models/catalog");
let operation = || async {
let response = self.client.get(&url).send().await?;
let envelope: Envelope = self.handle_response(response).await?;
Ok(envelope.data.data)
};
if self.auth_config.enable_retry {
retry_with_backoff(&self.retry_config, operation).await
} else {
operation().await
}
}
pub async fn select_models(
&self,
criteria: ModelSelectionCriteria,
) -> Result<Vec<ModelCatalogItem>> {
let catalog = self.get_models_catalog().await?;
Ok(crate::models::select_models(&catalog, &criteria))
}
pub fn build_reasoning_config(
&self,
model: &ModelCatalogItem,
preference: &ReasoningPreference,
) -> Option<serde_json::Value> {
crate::models::build_reasoning_config(model, preference)
}
pub async fn simple_chat(
&self,
model: impl Into<String>,
prompt: impl Into<String>,
) -> Result<String> {
let request = ChatCompletionRequest::new(model, vec![ChatMessage::user(prompt)]);
let (response, _) = self.chat_completion(request).await?;
Ok(response
.choices
.into_iter()
.next()
.map(|choice| choice.message.content)
.unwrap_or_default())
}
pub(crate) async fn handle_response<T>(&self, response: Response) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let status = response.status();
let headers = response.headers().clone();
let request_id = headers
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(String::from);
if status.is_success() {
let body = response.bytes().await?;
serde_json::from_slice(&body).map_err(|e| RainyError::Serialization {
message: format!("Failed to parse response: {}", e),
source_error: Some(e.to_string()),
})
} else {
let text = response.text().await.unwrap_or_default();
self.handle_error_text(status, request_id, text)
}
}
pub(crate) async fn handle_stream_response<T>(
&self,
response: Response,
) -> Result<Pin<Box<dyn Stream<Item = Result<T>> + Send>>>
where
T: serde::de::DeserializeOwned + Send + 'static,
{
let status = response.status();
let request_id = response
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(String::from);
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return self.handle_error_text(status, request_id, text);
}
let stream = response
.bytes_stream()
.eventsource()
.filter_map(|event| async move {
match event {
Ok(event) => {
let payload = event.data.trim();
if payload.is_empty() || payload.eq_ignore_ascii_case("[DONE]") {
return None;
}
match serde_json::from_str::<T>(payload) {
Ok(chunk) => Some(Ok(chunk)),
Err(e) => Some(Err(RainyError::Serialization {
message: format!("Failed to parse stream chunk: {}", e),
source_error: Some(e.to_string()),
})),
}
}
Err(e) => Some(Err(RainyError::Network {
message: format!("Stream error: {}", e),
retryable: true,
source_error: Some(e.to_string()),
})),
}
});
Ok(Box::pin(stream))
}
pub(crate) async fn handle_chat_stream_response(
&self,
response: Response,
) -> Result<Pin<Box<dyn Stream<Item = Result<ChatStreamEvent>> + Send>>> {
let status = response.status();
let request_id = response
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(String::from);
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return self.handle_error_text(status, request_id, text);
}
let stream = response
.bytes_stream()
.eventsource()
.filter_map(|event| async move {
match event {
Ok(event) => {
let payload = event.data.trim();
if payload.is_empty() || payload.eq_ignore_ascii_case("[DONE]") {
return None;
}
match serde_json::from_str::<serde_json::Value>(payload) {
Ok(value) => Some(Ok(ChatStreamEvent::from_value(value))),
Err(e) => Some(Err(RainyError::Serialization {
message: format!("Failed to parse stream chunk: {}", e),
source_error: Some(e.to_string()),
})),
}
}
Err(e) => Some(Err(RainyError::Network {
message: format!("Stream error: {}", e),
retryable: true,
source_error: Some(e.to_string()),
})),
}
});
Ok(Box::pin(stream))
}
fn handle_error_text<T>(
&self,
status: reqwest::StatusCode,
request_id: Option<String>,
text: String,
) -> Result<T> {
if let Ok(error_response) = serde_json::from_str::<ApiErrorResponse>(&text) {
let error = error_response.error;
self.map_api_error(error, status.as_u16(), request_id)
} else {
Err(RainyError::Api {
code: status.canonical_reason().unwrap_or("UNKNOWN").to_string(),
message: if text.is_empty() {
format!("HTTP {}", status.as_u16())
} else {
text
},
status_code: status.as_u16(),
retryable: status.is_server_error(),
request_id,
})
}
}
fn extract_metadata(&self, response: &Response, start_time: Instant) -> RequestMetadata {
let headers = response.headers();
RequestMetadata {
response_time: Some(start_time.elapsed().as_millis() as u64),
provider: headers
.get("x-provider")
.and_then(|v| v.to_str().ok())
.map(String::from),
tokens_used: headers
.get("x-tokens-used")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok()),
credits_used: headers
.get("x-credits-used")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok()),
credits_remaining: headers
.get("x-credits-remaining")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok()),
request_id: headers
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(String::from),
compat_warnings: headers
.get("x-rainy-compat-warnings")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok()),
response_mode: headers
.get("x-rainy-response-mode")
.and_then(|v| v.to_str().ok())
.map(String::from),
billing_plan: headers
.get("x-rainy-billing-plan")
.and_then(|v| v.to_str().ok())
.map(String::from),
rainy_credits_charged: headers
.get("x-rainy-credits-charged")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok()),
rainy_daily_credits_remaining: headers
.get("x-rainy-daily-credits-remaining")
.and_then(|v| v.to_str().ok())
.map(String::from),
rainy_sanitized_params: headers
.get("x-rainy-sanitized-params")
.and_then(|v| v.to_str().ok())
.map(String::from),
rainy_billing_adjustment: headers
.get("x-rainy-billing-adjustment")
.and_then(|v| v.to_str().ok())
.map(String::from),
rainy_billing_outstanding_credits: headers
.get("x-rainy-billing-outstanding-credits")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok()),
}
}
fn map_api_error<T>(
&self,
error: crate::error::ApiErrorDetails,
status_code: u16,
request_id: Option<String>,
) -> Result<T> {
let retryable = error.retryable.unwrap_or(status_code >= 500);
let rainy_error = match error.code.as_str() {
"INVALID_API_KEY" | "EXPIRED_API_KEY" => RainyError::Authentication {
code: error.code,
message: error.message,
retryable: false,
},
"INSUFFICIENT_CREDITS" => {
let (current_credits, estimated_cost, reset_date) =
if let Some(details) = error.details {
let current = details
.get("current_credits")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let cost = details
.get("estimated_cost")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let reset = details
.get("reset_date")
.and_then(|v| v.as_str())
.map(String::from);
(current, cost, reset)
} else {
(0.0, 0.0, None)
};
RainyError::InsufficientCredits {
code: error.code,
message: error.message,
current_credits,
estimated_cost,
reset_date,
}
}
"RATE_LIMIT_EXCEEDED" => {
let retry_after = error
.details
.as_ref()
.and_then(|d| d.get("retry_after"))
.and_then(|v| v.as_u64());
RainyError::RateLimit {
code: error.code,
message: error.message,
retry_after,
current_usage: None,
}
}
"INVALID_REQUEST" | "MISSING_REQUIRED_FIELD" | "INVALID_MODEL" => {
RainyError::InvalidRequest {
code: error.code,
message: error.message,
details: error.details,
}
}
"PROVIDER_ERROR" | "PROVIDER_UNAVAILABLE" => {
let provider = error
.details
.as_ref()
.and_then(|d| d.get("provider"))
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
RainyError::Provider {
code: error.code,
message: error.message,
provider,
retryable,
}
}
_ => RainyError::Api {
code: error.code,
message: error.message,
status_code,
retryable,
request_id: request_id.clone(),
},
};
Err(rainy_error)
}
pub fn auth_config(&self) -> &AuthConfig {
&self.auth_config
}
pub fn base_url(&self) -> &str {
&self.auth_config.base_url
}
pub(crate) fn http_client(&self) -> &Client {
&self.client
}
pub async fn list_available_models(&self) -> Result<AvailableModels> {
self.get_available_models().await
}
pub(crate) async fn make_request<T: serde::de::DeserializeOwned>(
&self,
method: reqwest::Method,
endpoint: &str,
body: Option<serde_json::Value>,
) -> Result<T> {
#[cfg(feature = "rate-limiting")]
if let Some(ref limiter) = self.rate_limiter {
limiter.until_ready().await;
}
let url = self.api_v1_url(endpoint);
let mut request = self.client.request(method, &url);
if let Some(body) = body {
request = request.json(&body);
}
let response = request.send().await?;
self.handle_response(response).await
}
}
impl std::fmt::Debug for RainyClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RainyClient")
.field("base_url", &self.auth_config.base_url)
.field("timeout", &self.auth_config.timeout_seconds)
.field("max_retries", &self.retry_config.max_retries)
.finish()
}
}