rp_supabase_client/
lib.rs

1use core::marker::PhantomData;
2
3use futures::{Stream, StreamExt as _};
4use rp_postgrest::{Postgrest, reqwest};
5use rp_supabase_auth::error::AuthError;
6use rp_supabase_auth::jwt_stream::SupabaseAuthConfig;
7use rp_supabase_auth::types::{AccessTokenResponseSchema, LoginCredentials};
8use rp_supabase_auth::url;
9use tracing::instrument;
10pub use {rp_postgrest, rp_postgrest_error, rp_supabase_auth};
11
12pub struct PostgerstResponse<T> {
13    response: reqwest::Response,
14    result: PhantomData<T>,
15}
16
17pub const SUPABASE_KEY: &str = "apikey";
18
19/// Create a new authenticated supabase client stream
20///
21/// # Errors
22/// - the client cannot be constructed
23/// - the login url is invalid
24pub fn new_authenticated(
25    config: SupabaseAuthConfig,
26    login_info: LoginCredentials,
27) -> Result<
28    impl Stream<
29        Item = Result<(rp_postgrest::Postgrest, AccessTokenResponseSchema), SupabaseClientError>,
30    >,
31    SupabaseClientError,
32> {
33    let base = anonymous_client(config.api_key.clone(), &config.url)?;
34    let auth_stream = rp_supabase_auth::jwt_stream::JwtStream::new(config).sign_in(login_info)?;
35    let client_stream = auth_stream.map(move |item| {
36        item.map(|item| {
37            let mut client = base.clone();
38            if let Some(access_token) = item.access_token.as_ref() {
39                client = client.auth(access_token);
40            }
41            (client, item)
42        })
43        .map_err(SupabaseClientError::from)
44    });
45
46    Ok(client_stream)
47}
48
49/// Create a new anonymous supabase client
50///
51/// # Errors
52/// - the url is invalid
53pub fn anonymous_client(api_key: String, url: &url::Url) -> Result<Postgrest, SupabaseClientError> {
54    let url = url.join("rest/v1/")?;
55    let postgrest = rp_postgrest::Postgrest::new(url).insert_header(SUPABASE_KEY, api_key);
56    Ok(postgrest)
57}
58
59#[derive(thiserror::Error, Debug)]
60pub enum SupabaseClientError {
61    #[error("Jwt Stream closed unexpectedly")]
62    JwtStreamClosedUnexpectedly,
63    #[error("Refresh stream error")]
64    RefreshStreamError(#[from] rp_supabase_auth::jwt_stream::RefreshStreamError),
65    #[error("Auth sign in error")]
66    AuthSignInError(#[from] rp_supabase_auth::jwt_stream::SignInError),
67    #[error("Url parse error {0}")]
68    UrlParseError(#[from] url::ParseError),
69    #[error("Auth error {0}")]
70    AuthError(#[from] AuthError),
71}
72
73impl<T> PostgerstResponse<T> {
74    #[must_use]
75    pub const fn new(response: reqwest::Response) -> Self {
76        Self {
77            response,
78            result: PhantomData,
79        }
80    }
81
82    /// Only check if the returtned HTTP response is of error type; don't parse the data
83    ///
84    /// Useful when you don't care about the actual response besides if it was an error.
85    #[instrument(name = "response_ok", skip(self), err)]
86    pub fn ok(self) -> Result<(), IntrenalError> {
87        self.response.error_for_status()?;
88        Ok(())
89    }
90
91    /// Check if the returned HTTP result is an error;
92    /// Only parse the error type if we received an error.
93    ///
94    /// Useful when you don't care about the actual response besides if it was an error.
95    #[instrument(name = "parse_response_json_err", skip(self), err)]
96    pub async fn json_err(
97        self,
98    ) -> Result<Result<(), rp_postgrest_error::PostgrestUtilError>, IntrenalError> {
99        let status = self.response.status();
100        if status.is_success() {
101            Ok(Ok(()))
102        } else {
103            let bytes = self.response.bytes().await?.to_vec();
104            let error = parse_postgrest_error(bytes, status)?;
105            let error = rp_postgrest_error::PostgrestUtilError::from_error_response(error);
106            Ok(Err(error))
107        }
108    }
109
110    /// Parse the response json
111    #[instrument(name = "parse_response_json", skip(self), err)]
112    pub async fn json(
113        self,
114    ) -> Result<Result<T, rp_postgrest_error::PostgrestUtilError>, IntrenalError>
115    where
116        T: serde::de::DeserializeOwned,
117    {
118        let status = self.response.status();
119        let mut bytes = self.response.bytes().await?.to_vec();
120        if status.is_success() {
121            let json = String::from_utf8_lossy(bytes.as_ref());
122            tracing::debug!(response_body = %json, "Response JSON");
123
124            let result = simd_json::from_slice::<T>(bytes.as_mut())?;
125            Ok(Ok(result))
126        } else {
127            let error = parse_postgrest_error(bytes, status)?;
128            let error = rp_postgrest_error::PostgrestUtilError::from_error_response(error);
129            Ok(Err(error))
130        }
131    }
132}
133
134fn parse_postgrest_error<E>(
135    mut bytes: Vec<u8>,
136    status: reqwest::StatusCode,
137) -> Result<E, IntrenalError>
138where
139    E: serde::de::DeserializeOwned,
140{
141    let json = String::from_utf8_lossy(bytes.as_ref());
142    tracing::error!(
143        status = %status,
144        body = %json,
145        "Failed to execute request"
146    );
147
148    let error = simd_json::from_slice::<E>(bytes.as_mut())?;
149    Ok(error)
150}
151
152#[derive(thiserror::Error, Debug)]
153pub enum IntrenalError {
154    #[error("simd json error {0}")]
155    SimdJsonError(#[from] simd_json::Error),
156    #[error("reqwest {0}")]
157    ReqwestError(#[from] reqwest::Error),
158}