supabase_client_query/
postgrest_execute.rs1use reqwest::header::{HeaderMap, HeaderValue};
2use serde::de::DeserializeOwned;
3use serde_json::Value as JsonValue;
4
5use supabase_client_core::{StatusCode, SupabaseError, SupabaseResponse};
6
7use crate::sql::{CountOption, SqlOperation, SqlParts};
8
9pub async fn execute_rest<T: DeserializeOwned + Send>(
11 http: &reqwest::Client,
12 method: reqwest::Method,
13 url: &str,
14 mut headers: HeaderMap,
15 body: Option<JsonValue>,
16 api_key: &str,
17 schema: &str,
18 parts: &SqlParts,
19) -> SupabaseResponse<T> {
20 headers.insert("apikey", HeaderValue::from_str(api_key).unwrap());
22 headers.insert(
23 "Authorization",
24 HeaderValue::from_str(&format!("Bearer {}", api_key)).unwrap(),
25 );
26
27 if parts.schema_override.is_none() && schema != "public" {
29 match parts.operation {
30 SqlOperation::Select => {
31 headers
32 .entry("Accept-Profile")
33 .or_insert_with(|| HeaderValue::from_str(schema).unwrap());
34 }
35 _ => {
36 headers
37 .entry("Content-Profile")
38 .or_insert_with(|| HeaderValue::from_str(schema).unwrap());
39 }
40 }
41 }
42
43 headers
45 .entry("Accept")
46 .or_insert(HeaderValue::from_static("application/json"));
47
48 tracing::debug!(
49 method = %method,
50 url = %url,
51 "Executing PostgREST request"
52 );
53
54 let mut request = http.request(method.clone(), url).headers(headers);
55
56 if let Some(body) = body {
57 request = request.json(&body);
58 }
59
60 let response = match request.send().await {
61 Ok(r) => r,
62 Err(e) => return SupabaseResponse::error(SupabaseError::Http(e.to_string())),
63 };
64
65 let status_code = response.status().as_u16();
66 let resp_headers = response.headers().clone();
67
68 if method == reqwest::Method::HEAD || parts.head {
70 let count = parse_count_from_headers(&resp_headers);
71 if status_code >= 200 && status_code < 300 {
72 let mut resp = SupabaseResponse::<T>::ok(Vec::new());
73 if let Some(c) = count {
74 resp.count = Some(c);
75 }
76 return resp;
77 } else {
78 return SupabaseResponse::error(SupabaseError::postgrest(
79 status_code,
80 format!("HEAD request failed with status {}", status_code),
81 None,
82 ));
83 }
84 }
85
86 let body_text = match response.text().await {
88 Ok(t) => t,
89 Err(e) => return SupabaseResponse::error(SupabaseError::Http(e.to_string())),
90 };
91
92 if status_code >= 400 {
94 return parse_error_response(status_code, &body_text);
95 }
96
97 if status_code == 204 || body_text.is_empty() {
99 let count = parse_count_from_headers(&resp_headers);
100 let mut resp = SupabaseResponse::<T>::no_content();
101 resp.count = count;
102 return resp;
103 }
104
105 let count = parse_count_from_headers(&resp_headers);
107
108 if parts.single {
110 match serde_json::from_str::<T>(&body_text) {
112 Ok(item) => {
113 let mut resp = build_response_from_operation(vec![item], parts);
114 if let Some(c) = count {
115 resp.count = Some(c);
116 }
117 resp
118 }
119 Err(e) => SupabaseResponse::error(SupabaseError::Serialization(format!(
120 "Failed to parse single response: {}",
121 e
122 ))),
123 }
124 } else {
125 match serde_json::from_str::<Vec<T>>(&body_text) {
127 Ok(data) => {
128 let mut resp = build_response_from_operation(data, parts);
129 if let Some(c) = count {
130 resp.count = Some(c);
131 }
132 if parts.maybe_single {
134 match resp.data.len() {
135 0 | 1 => {}
136 n => return SupabaseResponse::error(SupabaseError::MultipleRows(n)),
137 }
138 }
139 resp
140 }
141 Err(_) => {
142 match serde_json::from_str::<T>(&body_text) {
144 Ok(item) => {
145 let mut resp = build_response_from_operation(vec![item], parts);
146 if let Some(c) = count {
147 resp.count = Some(c);
148 }
149 resp
150 }
151 Err(_) => {
152 match serde_json::from_str::<JsonValue>(&body_text) {
156 Ok(scalar) if !scalar.is_array() && !scalar.is_object() => {
157 let wrapped = format!(
158 "[{{\"{}\": {}}}]",
159 parts.table, body_text
160 );
161 match serde_json::from_str::<Vec<T>>(&wrapped) {
162 Ok(data) => {
163 let mut resp =
164 build_response_from_operation(data, parts);
165 if let Some(c) = count {
166 resp.count = Some(c);
167 }
168 resp
169 }
170 Err(e) => SupabaseResponse::error(
171 SupabaseError::Serialization(format!(
172 "Failed to parse scalar response: {}",
173 e
174 )),
175 ),
176 }
177 }
178 _ => SupabaseResponse::error(SupabaseError::Serialization(
179 format!(
180 "Failed to parse response: {}",
181 body_text
182 ),
183 )),
184 }
185 }
186 }
187 }
188 }
189 }
190}
191
192fn build_response_from_operation<T>(data: Vec<T>, parts: &SqlParts) -> SupabaseResponse<T> {
193 let status = match parts.operation {
194 SqlOperation::Insert | SqlOperation::Upsert => StatusCode::Created,
195 _ => StatusCode::Ok,
196 };
197
198 let count = if parts.count != CountOption::None {
199 Some(data.len() as i64)
200 } else {
201 None
202 };
203
204 SupabaseResponse {
205 data,
206 error: None,
207 count,
208 status,
209 }
210}
211
212fn parse_count_from_headers(headers: &HeaderMap) -> Option<i64> {
213 headers
215 .get("content-range")
216 .and_then(|v| v.to_str().ok())
217 .and_then(|s| {
218 if let Some(slash_pos) = s.rfind('/') {
219 let count_str = &s[slash_pos + 1..];
220 if count_str == "*" {
221 None
222 } else {
223 count_str.parse::<i64>().ok()
224 }
225 } else {
226 None
227 }
228 })
229}
230
231fn parse_error_response<T>(status_code: u16, body: &str) -> SupabaseResponse<T> {
232 if let Ok(error_obj) = serde_json::from_str::<JsonValue>(body) {
234 let message = error_obj
235 .get("message")
236 .and_then(|v| v.as_str())
237 .unwrap_or("Unknown error")
238 .to_string();
239 let code = error_obj
240 .get("code")
241 .and_then(|v| v.as_str())
242 .map(|s| s.to_string());
243
244 SupabaseResponse::error(SupabaseError::postgrest(status_code, message, code))
245 } else {
246 SupabaseResponse::error(SupabaseError::postgrest(
247 status_code,
248 body.to_string(),
249 None,
250 ))
251 }
252}