1use base64::Engine;
6use bytes::{BufMut, Bytes, BytesMut};
7use futures_util::{Stream, StreamExt};
8use reqwest::{Client, Response, StatusCode};
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use std::pin::Pin;
13use std::time::Duration;
14use thiserror::Error;
15use url::Url;
16
17#[derive(Debug, Clone, Deserialize)]
19pub struct FunctionErrorDetails {
20 pub message: Option<String>,
21 pub status: Option<u16>,
22 pub code: Option<String>,
23 pub details: Option<Value>,
24}
25
26#[derive(Debug, Error)]
28pub enum FunctionsError {
29 #[error("Request error: {0}")]
30 RequestError(#[from] reqwest::Error),
31
32 #[error("URL parse error: {0}")]
33 UrlError(#[from] url::ParseError),
34
35 #[error("JSON error: {0}")]
36 JsonError(#[from] serde_json::Error),
37
38 #[error("Function error (status: {status}): {message}")]
39 FunctionError {
40 message: String,
41 status: StatusCode,
42 details: Option<FunctionErrorDetails>,
43 },
44
45 #[error("Timeout error: Function execution exceeded timeout limit")]
46 TimeoutError,
47
48 #[error("Invalid response: {0}")]
49 InvalidResponse(String),
50}
51
52impl FunctionsError {
53 pub fn new(message: String) -> Self {
54 Self::FunctionError {
55 message,
56 status: StatusCode::INTERNAL_SERVER_ERROR,
57 details: None,
58 }
59 }
60
61 pub fn from_response(response: &Response) -> Self {
62 Self::FunctionError {
63 message: format!("Function returned error status: {}", response.status()),
64 status: response.status(),
65 details: None,
66 }
67 }
68
69 pub fn with_details(response: &Response, details: FunctionErrorDetails) -> Self {
70 Self::FunctionError {
71 message: details.message.as_ref().map_or_else(
72 || format!("Function returned error status: {}", response.status()),
73 |msg| msg.clone(),
74 ),
75 status: response.status(),
76 details: Some(details),
77 }
78 }
79}
80
81pub type Result<T> = std::result::Result<T, FunctionsError>;
82
83#[derive(Clone, Debug)]
85pub struct FunctionOptions {
86 pub headers: Option<HashMap<String, String>>,
88
89 pub timeout_seconds: Option<u64>,
91
92 pub response_type: ResponseType,
94
95 pub content_type: Option<String>,
97}
98
99impl Default for FunctionOptions {
100 fn default() -> Self {
101 Self {
102 headers: None,
103 timeout_seconds: None,
104 response_type: ResponseType::Json,
105 content_type: None,
106 }
107 }
108}
109
110#[derive(Clone, Copy, Debug, PartialEq, Eq)]
112pub enum ResponseType {
113 Json,
115
116 Text,
118
119 Binary,
121
122 Stream,
124}
125
126pub type ByteStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>;
128
129#[derive(Debug, Clone)]
131pub struct FunctionResponse<T> {
132 pub data: T,
134
135 pub status: StatusCode,
137
138 pub headers: HashMap<String, String>,
140}
141
142pub struct FunctionsClient {
144 base_url: String,
145 api_key: String,
146 http_client: Client,
147}
148
149pub struct FunctionRequest<'a, T> {
151 client: &'a FunctionsClient,
152 function_name: String,
153 _response_type: std::marker::PhantomData<T>,
154}
155
156impl<'a, T: DeserializeOwned> FunctionRequest<'a, T> {
157 pub async fn execute<B: Serialize>(
159 &self,
160 body: Option<B>,
161 options: Option<FunctionOptions>,
162 ) -> Result<T> {
163 let result = self
164 .client
165 .invoke::<T, B>(&self.function_name, body, options)
166 .await?;
167 Ok(result.data)
168 }
169}
170
171impl FunctionsClient {
172 pub fn new(supabase_url: &str, supabase_key: &str, http_client: Client) -> Self {
174 Self {
175 base_url: supabase_url.to_string(),
176 api_key: supabase_key.to_string(),
177 http_client,
178 }
179 }
180
181 pub async fn invoke<T: DeserializeOwned, B: Serialize>(
183 &self,
184 function_name: &str,
185 body: Option<B>,
186 options: Option<FunctionOptions>,
187 ) -> Result<FunctionResponse<T>> {
188 let opts = options.unwrap_or_default();
189
190 let mut url = Url::parse(&self.base_url)?;
192 url.path_segments_mut()
193 .map_err(|_| FunctionsError::UrlError(url::ParseError::EmptyHost))?
194 .push("functions")
195 .push("v1")
196 .push(function_name);
197
198 let mut request_builder = self
200 .http_client
201 .post(url)
202 .header("apikey", &self.api_key)
203 .header("Authorization", format!("Bearer {}", &self.api_key));
204
205 if let Some(timeout) = opts.timeout_seconds {
207 request_builder = request_builder.timeout(Duration::from_secs(timeout));
208 }
209
210 if let Some(content_type) = opts.content_type {
212 request_builder = request_builder.header("Content-Type", content_type);
213 }
214
215 if let Some(headers) = opts.headers {
217 for (key, value) in headers {
218 request_builder = request_builder.header(key, value);
219 }
220 }
221
222 if let Some(body_data) = body {
224 request_builder = request_builder.json(&body_data);
225 }
226
227 let response = request_builder.send().await.map_err(|e| {
229 if e.is_timeout() {
230 FunctionsError::TimeoutError
231 } else {
232 FunctionsError::from(e)
233 }
234 })?;
235
236 let status = response.status();
238 if !status.is_success() {
239 let status_copy = status;
241
242 let error_body = response
244 .text()
245 .await
246 .unwrap_or_else(|_| "Failed to read error response".to_string());
247
248 if let Ok(error_details) = serde_json::from_str::<FunctionErrorDetails>(&error_body) {
249 return Err(FunctionsError::FunctionError {
250 message: error_details.message.as_ref().map_or_else(
251 || format!("Function returned error status: {}", status_copy),
252 |msg| msg.clone(),
253 ),
254 status: status_copy,
255 details: Some(error_details),
256 });
257 } else {
258 return Err(FunctionsError::FunctionError {
259 message: error_body,
260 status: status_copy,
261 details: None,
262 });
263 }
264 }
265
266 let headers = response
268 .headers()
269 .iter()
270 .map(|(name, value)| (name.to_string(), value.to_str().unwrap_or("").to_string()))
271 .collect::<HashMap<String, String>>();
272
273 match opts.response_type {
275 ResponseType::Json => {
276 let data = response.json::<T>().await.map_err(|e| {
277 FunctionsError::JsonError(serde_json::from_str::<T>("{}").err().unwrap_or_else(
278 || {
279 serde_json::Error::io(std::io::Error::new(
280 std::io::ErrorKind::InvalidData,
281 e.to_string(),
282 ))
283 },
284 ))
285 })?;
286
287 Ok(FunctionResponse {
288 data,
289 status,
290 headers,
291 })
292 }
293 ResponseType::Text => {
294 let text = response.text().await?;
296
297 let data: T = serde_json::from_str(&text).unwrap_or_else(|_| {
299 panic!("Failed to deserialize text response as requested type")
300 });
301
302 Ok(FunctionResponse {
303 data,
304 status,
305 headers,
306 })
307 }
308 ResponseType::Binary => {
309 let bytes = response.bytes().await?;
311
312 let binary_str = base64::engine::general_purpose::STANDARD.encode(&bytes);
314
315 let data: T =
317 serde_json::from_str(&format!("\"{}\"", binary_str)).unwrap_or_else(|_| {
318 panic!("Failed to deserialize binary response as requested type")
319 });
320
321 Ok(FunctionResponse {
322 data,
323 status,
324 headers,
325 })
326 }
327 ResponseType::Stream => {
328 Err(FunctionsError::InvalidResponse(
331 "Stream response type cannot be handled by invoke(). Use invoke_stream() instead.".to_string()
332 ))
333 }
334 }
335 }
336
337 pub async fn invoke_json<T: DeserializeOwned, B: Serialize>(
339 &self,
340 function_name: &str,
341 body: Option<B>,
342 ) -> Result<T> {
343 let options = FunctionOptions {
344 response_type: ResponseType::Json,
345 ..Default::default()
346 };
347
348 let response = self
349 .invoke::<T, B>(function_name, body, Some(options))
350 .await?;
351 Ok(response.data)
352 }
353
354 pub async fn invoke_text<B: Serialize>(
356 &self,
357 function_name: &str,
358 body: Option<B>,
359 ) -> Result<String> {
360 let options = FunctionOptions {
361 response_type: ResponseType::Text,
362 ..Default::default()
363 };
364
365 let response = self
366 .invoke::<String, B>(function_name, body, Some(options))
367 .await?;
368 Ok(response.data)
369 }
370
371 pub async fn invoke_binary<B: Serialize>(
373 &self,
374 function_name: &str,
375 body: Option<B>,
376 ) -> Result<String> {
377 let options = FunctionOptions {
378 response_type: ResponseType::Binary,
379 ..Default::default()
380 };
381
382 let response = self
383 .invoke::<String, B>(function_name, body, Some(options))
384 .await?;
385 Ok(response.data)
386 }
387
388 pub async fn invoke_stream<B: Serialize>(
390 &self,
391 function_name: &str,
392 body: Option<B>,
393 options: Option<FunctionOptions>,
394 ) -> Result<ByteStream> {
395 let opts = options.unwrap_or_else(|| FunctionOptions {
396 response_type: ResponseType::Stream,
397 ..Default::default()
398 });
399
400 let mut url = Url::parse(&self.base_url)?;
402 url.path_segments_mut()
403 .map_err(|_| FunctionsError::UrlError(url::ParseError::EmptyHost))?
404 .push("functions")
405 .push("v1")
406 .push(function_name);
407
408 let mut request_builder = self
410 .http_client
411 .post(url)
412 .header("apikey", &self.api_key)
413 .header("Authorization", format!("Bearer {}", &self.api_key));
414
415 if let Some(timeout) = opts.timeout_seconds {
417 request_builder = request_builder.timeout(Duration::from_secs(timeout));
418 }
419
420 if let Some(content_type) = opts.content_type {
422 request_builder = request_builder.header("Content-Type", content_type);
423 } else {
424 request_builder = request_builder.header("Content-Type", "application/json");
426 }
427
428 if let Some(headers) = opts.headers {
430 for (key, value) in headers {
431 request_builder = request_builder.header(key, value);
432 }
433 }
434
435 if let Some(body_data) = body {
437 request_builder = request_builder.json(&body_data);
438 }
439
440 let response = request_builder.send().await.map_err(|e| {
442 if e.is_timeout() {
443 FunctionsError::TimeoutError
444 } else {
445 FunctionsError::from(e)
446 }
447 })?;
448
449 let status = response.status();
451 if !status.is_success() {
452 let status_copy = status;
454
455 let error_body = response
457 .text()
458 .await
459 .unwrap_or_else(|_| "Failed to read error response".to_string());
460
461 if let Ok(error_details) = serde_json::from_str::<FunctionErrorDetails>(&error_body) {
462 return Err(FunctionsError::FunctionError {
463 message: error_details.message.as_ref().map_or_else(
464 || format!("Function returned error status: {}", status_copy),
465 |msg| msg.clone(),
466 ),
467 status: status_copy,
468 details: Some(error_details),
469 });
470 } else {
471 return Err(FunctionsError::FunctionError {
472 message: error_body,
473 status: status_copy,
474 details: None,
475 });
476 }
477 }
478
479 Ok(Box::pin(
481 response
482 .bytes_stream()
483 .map(|result| result.map_err(FunctionsError::from)),
484 ))
485 }
486
487 pub async fn invoke_json_stream<B: Serialize>(
489 &self,
490 function_name: &str,
491 body: Option<B>,
492 options: Option<FunctionOptions>,
493 ) -> Result<Pin<Box<dyn Stream<Item = Result<Value>> + Send + '_>>> {
494 let byte_stream = self.invoke_stream(function_name, body, options).await?;
495 let json_stream = self.byte_stream_to_json(byte_stream);
496 Ok(json_stream)
497 }
498
499 fn byte_stream_to_json(
501 &self,
502 stream: ByteStream,
503 ) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send + '_>> {
504 Box::pin(async_stream::stream! {
505 let mut line_stream = self.stream_to_lines(stream);
506
507 while let Some(line_result) = line_stream.next().await {
508 match line_result {
509 Ok(line) => {
510 if line.trim().is_empty() {
512 continue;
513 }
514
515 match serde_json::from_str::<Value>(&line) {
517 Ok(json_value) => {
518 yield Ok(json_value);
519 },
520 Err(err) => {
521 yield Err(FunctionsError::JsonError(err));
522 }
523 }
524 },
525 Err(err) => {
526 yield Err(err);
527 break;
528 }
529 }
530 }
531 })
532 }
533
534 pub fn stream_to_lines(
536 &self,
537 stream: ByteStream,
538 ) -> Pin<Box<dyn Stream<Item = Result<String>> + Send + '_>> {
539 Box::pin(async_stream::stream! {
540 let mut buf = BytesMut::new();
541
542 tokio::pin!(stream);
544 while let Some(chunk_result) = stream.next().await {
545 match chunk_result {
546 Ok(chunk) => {
547 buf.extend_from_slice(&chunk);
548
549 while let Some(i) = buf.iter().position(|&b| b == b'\n') {
551 let line = if i > 0 && buf[i - 1] == b'\r' {
552 let line = String::from_utf8_lossy(&buf[..i - 1]).to_string();
554 unsafe { buf.advance_mut(i + 1); }
555 line
556 } else {
557 let line = String::from_utf8_lossy(&buf[..i]).to_string();
559 unsafe { buf.advance_mut(i + 1); }
560 line
561 };
562
563 yield Ok(line);
564 }
565 },
566 Err(e) => {
567 yield Err(FunctionsError::from(e));
568 break;
569 }
570 }
571 }
572
573 if !buf.is_empty() {
575 let line = String::from_utf8_lossy(&buf).to_string();
576 yield Ok(line);
577 }
578 })
579 }
580
581 pub fn create_request<T: DeserializeOwned>(
583 &self,
584 function_name: &str,
585 ) -> FunctionRequest<'_, T> {
586 FunctionRequest {
587 client: self,
588 function_name: function_name.to_string(),
589 _response_type: std::marker::PhantomData,
590 }
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 #[tokio::test]
599 async fn test_invoke() {
600 }
602}