use reqwest::{header::HeaderMap, Method, RequestBuilder, Response, StatusCode};
use serde_json::Value;
use tracing::{debug, warn};
use crate::error::{AuthError, PostgrestError, Result, StorageError, SupabaseError};
use crate::SupabaseClient;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Service {
Postgrest,
Auth,
Storage,
Functions,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HttpMethod {
Get,
Post,
Put,
Patch,
Delete,
}
impl HttpMethod {
pub fn as_reqwest(&self) -> Method {
match self {
Self::Get => Method::GET,
Self::Post => Method::POST,
Self::Put => Method::PUT,
Self::Patch => Method::PATCH,
Self::Delete => Method::DELETE,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct RequestOptions {
pub service: Option<Service>,
pub upsert: bool,
pub prefer: Vec<String>,
pub headers: Vec<(String, String)>,
pub bearer_override: Option<String>,
}
impl RequestOptions {
pub fn postgrest() -> Self {
Self { service: Some(Service::Postgrest), ..Self::default() }
}
pub fn auth() -> Self {
Self { service: Some(Service::Auth), ..Self::default() }
}
pub fn storage() -> Self {
Self { service: Some(Service::Storage), ..Self::default() }
}
}
impl SupabaseClient {
pub fn build_request(&self, method: Method, url: &str, opts: &RequestOptions) -> RequestBuilder {
let bearer = match opts.bearer_override.clone() {
Some(b) => b,
None => self.effective_bearer(),
};
let mut req = self
.http
.request(method, url)
.header("apikey", &self.api_key)
.bearer_auth(bearer);
if let Some(schema) = &self.schema {
req = req
.header("Accept-Profile", schema)
.header("Content-Profile", schema);
}
for (k, v) in &self.extra_headers {
req = req.header(k.as_str(), v);
}
for (k, v) in &opts.headers {
req = req.header(k.as_str(), v);
}
let mut prefer: Vec<String> = opts.prefer.clone();
if opts.upsert {
prefer.push("resolution=merge-duplicates".to_string());
}
if !prefer.is_empty() {
req = req.header("Prefer", prefer.join(","));
}
req
}
pub async fn request(
&self,
path: &str,
method: HttpMethod,
payload: Option<Value>,
upsert: bool,
) -> Result<Value> {
let opts = RequestOptions {
upsert,
..RequestOptions::postgrest()
};
self.request_with(path, method, payload, &opts).await
}
pub async fn request_with(
&self,
path: &str,
method: HttpMethod,
payload: Option<Value>,
opts: &RequestOptions,
) -> Result<Value> {
let resp = self.send(path, method, payload, opts).await?;
let (status, _headers, body) = read_response(resp).await?;
if !status.is_success() {
return Err(decode_error(
opts.service.unwrap_or(Service::Postgrest),
status,
&body,
));
}
if body.is_empty() {
return Ok(Value::Null);
}
serde_json::from_str(&body).map_err(|e| SupabaseError::Decode {
message: e.to_string(),
body,
})
}
pub async fn request_bytes(
&self,
path: &str,
method: HttpMethod,
body: Vec<u8>,
content_type: &str,
opts: &RequestOptions,
) -> Result<Value> {
let max_retries: u32 = self.retry.max_retries;
let base_backoff = self.retry.base_backoff;
let url = format!("{}{}", self.url, path);
let mut last_status: Option<u16> = None;
for attempt in 0..=max_retries {
let req = self
.build_request(method.as_reqwest(), &url, opts)
.header("Content-Type", content_type)
.body(body.clone());
let resp = req.send().await?;
let status = resp.status();
if status == StatusCode::TOO_MANY_REQUESTS && attempt < max_retries {
last_status = Some(status.as_u16());
let backoff = base_backoff
.checked_mul(2_u32.saturating_pow(attempt))
.unwrap_or(base_backoff);
warn!(target: "supabase", attempt, ?backoff, "429 Too Many Requests; backing off");
tokio::time::sleep(backoff).await;
continue;
}
let (status, _headers, body_text) = read_response(resp).await?;
if !status.is_success() {
return Err(decode_error(
opts.service.unwrap_or(Service::Storage),
status,
&body_text,
));
}
if body_text.is_empty() {
return Ok(Value::Null);
}
return serde_json::from_str(&body_text).map_err(|e| SupabaseError::Decode {
message: e.to_string(),
body: body_text,
});
}
Err(SupabaseError::RetryExhausted {
attempts: max_retries,
last_status,
})
}
pub async fn request_streaming(
&self,
path: &str,
method: HttpMethod,
opts: &RequestOptions,
) -> Result<Response> {
let resp = self.send(path, method, None, opts).await?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await?;
return Err(decode_error(
opts.service.unwrap_or(Service::Storage),
status,
&body,
));
}
Ok(resp)
}
pub async fn request_full(
&self,
path: &str,
method: HttpMethod,
payload: Option<Value>,
opts: &RequestOptions,
) -> Result<(StatusCode, HeaderMap, String)> {
let resp = self.send(path, method, payload, opts).await?;
let (status, headers, body) = read_response(resp).await?;
if !status.is_success() {
return Err(decode_error(
opts.service.unwrap_or(Service::Postgrest),
status,
&body,
));
}
Ok((status, headers, body))
}
async fn send(
&self,
path: &str,
method: HttpMethod,
payload: Option<Value>,
opts: &RequestOptions,
) -> Result<Response> {
let max_retries: u32 = self.retry.max_retries;
let base_backoff = self.retry.base_backoff;
let url = format!("{}{}", self.url, path);
debug!(target: "supabase", %url, ?method, service = ?opts.service, "sending request");
let mut last_status: Option<u16> = None;
for attempt in 0..=max_retries {
let mut req = self.build_request(method.as_reqwest(), &url, opts);
if let Some(body) = &payload {
req = req.json(body);
}
let resp = req.send().await?;
let status = resp.status();
debug!(target: "supabase", %url, status = status.as_u16(), attempt, "received response");
if status == StatusCode::TOO_MANY_REQUESTS && attempt < max_retries {
last_status = Some(status.as_u16());
let backoff = base_backoff
.checked_mul(2_u32.saturating_pow(attempt))
.unwrap_or(base_backoff);
warn!(target: "supabase", attempt, ?backoff, "429 Too Many Requests; backing off");
tokio::time::sleep(backoff).await;
continue;
}
return Ok(resp);
}
Err(SupabaseError::RetryExhausted {
attempts: max_retries,
last_status,
})
}
}
async fn read_response(resp: Response) -> Result<(StatusCode, HeaderMap, String)> {
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.text().await?;
Ok((status, headers, body))
}
pub(crate) fn decode_error(service: Service, status: StatusCode, body: &str) -> SupabaseError {
let status_code = status.as_u16();
match service {
Service::Postgrest => {
if let Ok(mut e) = serde_json::from_str::<PostgrestError>(body) {
e.status = status_code;
return SupabaseError::Postgrest(e);
}
SupabaseError::Postgrest(PostgrestError {
code: None,
message: if body.is_empty() { status.to_string() } else { body.to_string() },
details: None,
hint: None,
status: status_code,
})
}
Service::Auth => {
if let Ok(mut e) = serde_json::from_str::<AuthError>(body) {
e.status = Some(status_code);
return SupabaseError::Auth(e);
}
SupabaseError::Auth(AuthError {
code: None,
error_code: None,
message: if body.is_empty() { status.to_string() } else { body.to_string() },
status: Some(status_code),
})
}
Service::Storage | Service::Functions => {
if let Ok(mut e) = serde_json::from_str::<StorageError>(body) {
e.status = Some(status_code);
return SupabaseError::Storage(e);
}
SupabaseError::Storage(StorageError {
status_code: Some(status_code.to_string()),
error: None,
message: if body.is_empty() { status.to_string() } else { body.to_string() },
status: Some(status_code),
})
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn http_method_as_reqwest_all_variants() {
assert_eq!(HttpMethod::Get.as_reqwest(), reqwest::Method::GET);
assert_eq!(HttpMethod::Post.as_reqwest(), reqwest::Method::POST);
assert_eq!(HttpMethod::Put.as_reqwest(), reqwest::Method::PUT);
assert_eq!(HttpMethod::Patch.as_reqwest(), reqwest::Method::PATCH);
assert_eq!(HttpMethod::Delete.as_reqwest(), reqwest::Method::DELETE);
}
#[test]
fn http_method_eq_and_copy() {
let m = HttpMethod::Post;
assert_eq!(m, m);
assert_ne!(HttpMethod::Get, HttpMethod::Post);
}
#[test]
fn service_eq() {
assert_eq!(Service::Postgrest, Service::Postgrest);
assert_ne!(Service::Auth, Service::Storage);
assert_ne!(Service::Functions, Service::Postgrest);
}
#[test]
fn request_options_postgrest() {
let opts = RequestOptions::postgrest();
assert_eq!(opts.service, Some(Service::Postgrest));
assert!(!opts.upsert);
assert!(opts.prefer.is_empty());
assert!(opts.headers.is_empty());
assert!(opts.bearer_override.is_none());
}
#[test]
fn request_options_auth() {
let opts = RequestOptions::auth();
assert_eq!(opts.service, Some(Service::Auth));
}
#[test]
fn request_options_storage() {
let opts = RequestOptions::storage();
assert_eq!(opts.service, Some(Service::Storage));
}
#[test]
fn request_options_default_has_no_service() {
let opts = RequestOptions::default();
assert!(opts.service.is_none());
}
}