supabase_rust_functions/
lib.rs

1//! Supabase Edge Functions client for Rust
2//!
3//! This crate provides functionality for invoking Supabase Edge Functions.
4
5use base64::Engine;
6use bytes::{BufMut, Bytes, BytesMut};
7use futures_util::{Stream, StreamExt};
8use reqwest::{Client, Response, StatusCode};
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use std::pin::Pin;
13use std::time::Duration;
14use thiserror::Error;
15use url::Url;
16
17/// エラー型の詳細
18#[derive(Debug, Clone, Deserialize)]
19pub struct FunctionErrorDetails {
20    pub message: Option<String>,
21    pub status: Option<u16>,
22    pub code: Option<String>,
23    pub details: Option<Value>,
24}
25
26/// エラー型
27#[derive(Debug, Error)]
28pub enum FunctionsError {
29    #[error("Request error: {0}")]
30    RequestError(#[from] reqwest::Error),
31
32    #[error("URL parse error: {0}")]
33    UrlError(#[from] url::ParseError),
34
35    #[error("JSON error: {0}")]
36    JsonError(#[from] serde_json::Error),
37
38    #[error("Function error (status: {status}): {message}")]
39    FunctionError {
40        message: String,
41        status: StatusCode,
42        details: Option<FunctionErrorDetails>,
43    },
44
45    #[error("Timeout error: Function execution exceeded timeout limit")]
46    TimeoutError,
47
48    #[error("Invalid response: {0}")]
49    InvalidResponse(String),
50}
51
52impl FunctionsError {
53    pub fn new(message: String) -> Self {
54        Self::FunctionError {
55            message,
56            status: StatusCode::INTERNAL_SERVER_ERROR,
57            details: None,
58        }
59    }
60
61    pub fn from_response(response: &Response) -> Self {
62        Self::FunctionError {
63            message: format!("Function returned error status: {}", response.status()),
64            status: response.status(),
65            details: None,
66        }
67    }
68
69    pub fn with_details(response: &Response, details: FunctionErrorDetails) -> Self {
70        Self::FunctionError {
71            message: details.message.as_ref().map_or_else(
72                || format!("Function returned error status: {}", response.status()),
73                |msg| msg.clone(),
74            ),
75            status: response.status(),
76            details: Some(details),
77        }
78    }
79}
80
81pub type Result<T> = std::result::Result<T, FunctionsError>;
82
83/// 関数呼び出しオプション
84#[derive(Clone, Debug)]
85pub struct FunctionOptions {
86    /// カスタムHTTPヘッダー
87    pub headers: Option<HashMap<String, String>>,
88
89    /// 関数タイムアウト(秒)
90    pub timeout_seconds: Option<u64>,
91
92    /// レスポンスのコンテンツタイプを指定(デフォルトはJSONとして処理)
93    pub response_type: ResponseType,
94
95    /// リクエストのコンテンツタイプ
96    pub content_type: Option<String>,
97}
98
99impl Default for FunctionOptions {
100    fn default() -> Self {
101        Self {
102            headers: None,
103            timeout_seconds: None,
104            response_type: ResponseType::Json,
105            content_type: None,
106        }
107    }
108}
109
110/// レスポンスの処理方法
111#[derive(Clone, Copy, Debug, PartialEq, Eq)]
112pub enum ResponseType {
113    /// JSONとしてパースする(デフォルト)
114    Json,
115
116    /// テキストとして処理する
117    Text,
118
119    /// バイトデータとして処理する
120    Binary,
121
122    /// ストリームとして処理する
123    Stream,
124}
125
126/// ストリーミングレスポンス用の型
127pub type ByteStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>;
128
129/// 関数レスポンス
130#[derive(Debug, Clone)]
131pub struct FunctionResponse<T> {
132    /// レスポンスデータ
133    pub data: T,
134
135    /// HTTPステータスコード
136    pub status: StatusCode,
137
138    /// レスポンスヘッダー
139    pub headers: HashMap<String, String>,
140}
141
142/// Edge Functions クライアント
143pub struct FunctionsClient {
144    base_url: String,
145    api_key: String,
146    http_client: Client,
147}
148
149/// 関数リクエストを表す構造体
150pub struct FunctionRequest<'a, T> {
151    client: &'a FunctionsClient,
152    function_name: String,
153    _response_type: std::marker::PhantomData<T>,
154}
155
156impl<'a, T: DeserializeOwned> FunctionRequest<'a, T> {
157    /// 関数を実行する
158    pub async fn execute<B: Serialize>(
159        &self,
160        body: Option<B>,
161        options: Option<FunctionOptions>,
162    ) -> Result<T> {
163        let result = self
164            .client
165            .invoke::<T, B>(&self.function_name, body, options)
166            .await?;
167        Ok(result.data)
168    }
169}
170
171impl FunctionsClient {
172    /// 新しい Edge Functions クライアントを作成
173    pub fn new(supabase_url: &str, supabase_key: &str, http_client: Client) -> Self {
174        Self {
175            base_url: supabase_url.to_string(),
176            api_key: supabase_key.to_string(),
177            http_client,
178        }
179    }
180
181    /// Edge Function を呼び出す
182    pub async fn invoke<T: DeserializeOwned, B: Serialize>(
183        &self,
184        function_name: &str,
185        body: Option<B>,
186        options: Option<FunctionOptions>,
187    ) -> Result<FunctionResponse<T>> {
188        let opts = options.unwrap_or_default();
189
190        // URLの構築
191        let mut url = Url::parse(&self.base_url)?;
192        url.path_segments_mut()
193            .map_err(|_| FunctionsError::UrlError(url::ParseError::EmptyHost))?
194            .push("functions")
195            .push("v1")
196            .push(function_name);
197
198        // リクエストの構築
199        let mut request_builder = self
200            .http_client
201            .post(url)
202            .header("apikey", &self.api_key)
203            .header("Authorization", format!("Bearer {}", &self.api_key));
204
205        // リクエストタイムアウトの設定
206        if let Some(timeout) = opts.timeout_seconds {
207            request_builder = request_builder.timeout(Duration::from_secs(timeout));
208        }
209
210        // コンテンツタイプの設定
211        if let Some(content_type) = opts.content_type {
212            request_builder = request_builder.header("Content-Type", content_type);
213        }
214
215        // カスタムヘッダーの追加
216        if let Some(headers) = opts.headers {
217            for (key, value) in headers {
218                request_builder = request_builder.header(key, value);
219            }
220        }
221
222        // リクエストボディの追加
223        if let Some(body_data) = body {
224            request_builder = request_builder.json(&body_data);
225        }
226
227        // リクエストの送信
228        let response = request_builder.send().await.map_err(|e| {
229            if e.is_timeout() {
230                FunctionsError::TimeoutError
231            } else {
232                FunctionsError::from(e)
233            }
234        })?;
235
236        // ステータスコードの確認
237        let status = response.status();
238        if !status.is_success() {
239            // レスポンスのクローンを作成してエラー処理に使用
240            let status_copy = status;
241
242            // エラーレスポンスのパース
243            let error_body = response
244                .text()
245                .await
246                .unwrap_or_else(|_| "Failed to read error response".to_string());
247
248            if let Ok(error_details) = serde_json::from_str::<FunctionErrorDetails>(&error_body) {
249                return Err(FunctionsError::FunctionError {
250                    message: error_details.message.as_ref().map_or_else(
251                        || format!("Function returned error status: {}", status_copy),
252                        |msg| msg.clone(),
253                    ),
254                    status: status_copy,
255                    details: Some(error_details),
256                });
257            } else {
258                return Err(FunctionsError::FunctionError {
259                    message: error_body,
260                    status: status_copy,
261                    details: None,
262                });
263            }
264        }
265
266        // レスポンスヘッダーの抽出
267        let headers = response
268            .headers()
269            .iter()
270            .map(|(name, value)| (name.to_string(), value.to_str().unwrap_or("").to_string()))
271            .collect::<HashMap<String, String>>();
272
273        // レスポンスタイプに応じた処理
274        match opts.response_type {
275            ResponseType::Json => {
276                let data = response.json::<T>().await.map_err(|e| {
277                    FunctionsError::JsonError(serde_json::from_str::<T>("{}").err().unwrap_or_else(
278                        || {
279                            serde_json::Error::io(std::io::Error::new(
280                                std::io::ErrorKind::InvalidData,
281                                e.to_string(),
282                            ))
283                        },
284                    ))
285                })?;
286
287                Ok(FunctionResponse {
288                    data,
289                    status,
290                    headers,
291                })
292            }
293            ResponseType::Text => {
294                // テキスト処理
295                let text = response.text().await?;
296
297                // テキストからデシリアライズを試みる
298                let data: T = serde_json::from_str(&text).unwrap_or_else(|_| {
299                    panic!("Failed to deserialize text response as requested type")
300                });
301
302                Ok(FunctionResponse {
303                    data,
304                    status,
305                    headers,
306                })
307            }
308            ResponseType::Binary => {
309                // バイナリデータ処理
310                let bytes = response.bytes().await?;
311
312                // Base64エンコード(非推奨API対応)
313                let binary_str = base64::engine::general_purpose::STANDARD.encode(&bytes);
314
315                // バイナリデータをデシリアライズ
316                let data: T =
317                    serde_json::from_str(&format!("\"{}\"", binary_str)).unwrap_or_else(|_| {
318                        panic!("Failed to deserialize binary response as requested type")
319                    });
320
321                Ok(FunctionResponse {
322                    data,
323                    status,
324                    headers,
325                })
326            }
327            ResponseType::Stream => {
328                // ストリームレスポンスの場合、通常のデシリアライズではなく
329                // 別のストリーム処理用のメソッドを使用する必要がある
330                Err(FunctionsError::InvalidResponse(
331                    "Stream response type cannot be handled by invoke(). Use invoke_stream() instead.".to_string()
332                ))
333            }
334        }
335    }
336
337    /// JSONを返すファンクションを呼び出す(シンプルなラッパー)
338    pub async fn invoke_json<T: DeserializeOwned, B: Serialize>(
339        &self,
340        function_name: &str,
341        body: Option<B>,
342    ) -> Result<T> {
343        let options = FunctionOptions {
344            response_type: ResponseType::Json,
345            ..Default::default()
346        };
347
348        let response = self
349            .invoke::<T, B>(function_name, body, Some(options))
350            .await?;
351        Ok(response.data)
352    }
353
354    /// テキストを返すファンクションを呼び出す(シンプルなラッパー)
355    pub async fn invoke_text<B: Serialize>(
356        &self,
357        function_name: &str,
358        body: Option<B>,
359    ) -> Result<String> {
360        let options = FunctionOptions {
361            response_type: ResponseType::Text,
362            ..Default::default()
363        };
364
365        let response = self
366            .invoke::<String, B>(function_name, body, Some(options))
367            .await?;
368        Ok(response.data)
369    }
370
371    /// バイナリデータを返すファンクションを呼び出す(シンプルなラッパー)
372    pub async fn invoke_binary<B: Serialize>(
373        &self,
374        function_name: &str,
375        body: Option<B>,
376    ) -> Result<String> {
377        let options = FunctionOptions {
378            response_type: ResponseType::Binary,
379            ..Default::default()
380        };
381
382        let response = self
383            .invoke::<String, B>(function_name, body, Some(options))
384            .await?;
385        Ok(response.data)
386    }
387
388    /// ストリーミングレスポンスを取得するメソッド
389    pub async fn invoke_stream<B: Serialize>(
390        &self,
391        function_name: &str,
392        body: Option<B>,
393        options: Option<FunctionOptions>,
394    ) -> Result<ByteStream> {
395        let opts = options.unwrap_or_else(|| FunctionOptions {
396            response_type: ResponseType::Stream,
397            ..Default::default()
398        });
399
400        // URLの構築
401        let mut url = Url::parse(&self.base_url)?;
402        url.path_segments_mut()
403            .map_err(|_| FunctionsError::UrlError(url::ParseError::EmptyHost))?
404            .push("functions")
405            .push("v1")
406            .push(function_name);
407
408        // リクエストの構築
409        let mut request_builder = self
410            .http_client
411            .post(url)
412            .header("apikey", &self.api_key)
413            .header("Authorization", format!("Bearer {}", &self.api_key));
414
415        // リクエストタイムアウトの設定
416        if let Some(timeout) = opts.timeout_seconds {
417            request_builder = request_builder.timeout(Duration::from_secs(timeout));
418        }
419
420        // コンテンツタイプの設定
421        if let Some(content_type) = opts.content_type {
422            request_builder = request_builder.header("Content-Type", content_type);
423        } else {
424            // デフォルトはJSON
425            request_builder = request_builder.header("Content-Type", "application/json");
426        }
427
428        // カスタムヘッダーの追加
429        if let Some(headers) = opts.headers {
430            for (key, value) in headers {
431                request_builder = request_builder.header(key, value);
432            }
433        }
434
435        // リクエストボディの追加
436        if let Some(body_data) = body {
437            request_builder = request_builder.json(&body_data);
438        }
439
440        // リクエストの送信
441        let response = request_builder.send().await.map_err(|e| {
442            if e.is_timeout() {
443                FunctionsError::TimeoutError
444            } else {
445                FunctionsError::from(e)
446            }
447        })?;
448
449        // ステータスコードの確認
450        let status = response.status();
451        if !status.is_success() {
452            // ステータスコードのコピーを保持
453            let status_copy = status;
454
455            // エラーレスポンスのパース
456            let error_body = response
457                .text()
458                .await
459                .unwrap_or_else(|_| "Failed to read error response".to_string());
460
461            if let Ok(error_details) = serde_json::from_str::<FunctionErrorDetails>(&error_body) {
462                return Err(FunctionsError::FunctionError {
463                    message: error_details.message.as_ref().map_or_else(
464                        || format!("Function returned error status: {}", status_copy),
465                        |msg| msg.clone(),
466                    ),
467                    status: status_copy,
468                    details: Some(error_details),
469                });
470            } else {
471                return Err(FunctionsError::FunctionError {
472                    message: error_body,
473                    status: status_copy,
474                    details: None,
475                });
476            }
477        }
478
479        // ストリームを返す
480        Ok(Box::pin(
481            response
482                .bytes_stream()
483                .map(|result| result.map_err(FunctionsError::from)),
484        ))
485    }
486
487    /// JSONストリームを取得するメソッド(SSE形式のJSONイベントを扱う)
488    pub async fn invoke_json_stream<B: Serialize>(
489        &self,
490        function_name: &str,
491        body: Option<B>,
492        options: Option<FunctionOptions>,
493    ) -> Result<Pin<Box<dyn Stream<Item = Result<Value>> + Send + '_>>> {
494        let byte_stream = self.invoke_stream(function_name, body, options).await?;
495        let json_stream = self.byte_stream_to_json(byte_stream);
496        Ok(json_stream)
497    }
498
499    /// バイトストリームをJSONストリームに変換する
500    fn byte_stream_to_json(
501        &self,
502        stream: ByteStream,
503    ) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send + '_>> {
504        Box::pin(async_stream::stream! {
505            let mut line_stream = self.stream_to_lines(stream);
506
507            while let Some(line_result) = line_stream.next().await {
508                match line_result {
509                    Ok(line) => {
510                        // 空行はスキップ
511                        if line.trim().is_empty() {
512                            continue;
513                        }
514
515                        // JSON解析を試みる
516                        match serde_json::from_str::<Value>(&line) {
517                            Ok(json_value) => {
518                                yield Ok(json_value);
519                            },
520                            Err(err) => {
521                                yield Err(FunctionsError::JsonError(err));
522                            }
523                        }
524                    },
525                    Err(err) => {
526                        yield Err(err);
527                        break;
528                    }
529                }
530            }
531        })
532    }
533
534    /// ストリームを行に変換する
535    pub fn stream_to_lines(
536        &self,
537        stream: ByteStream,
538    ) -> Pin<Box<dyn Stream<Item = Result<String>> + Send + '_>> {
539        Box::pin(async_stream::stream! {
540            let mut buf = BytesMut::new();
541
542            // 行ごとに処理
543            tokio::pin!(stream);
544            while let Some(chunk_result) = stream.next().await {
545                match chunk_result {
546                    Ok(chunk) => {
547                        buf.extend_from_slice(&chunk);
548
549                        // バッファから完全な行を探して処理
550                        while let Some(i) = buf.iter().position(|&b| b == b'\n') {
551                            let line = if i > 0 && buf[i - 1] == b'\r' {
552                                // CRLF改行の処理
553                                let line = String::from_utf8_lossy(&buf[..i - 1]).to_string();
554                                unsafe { buf.advance_mut(i + 1); }
555                                line
556                            } else {
557                                // LF改行の処理
558                                let line = String::from_utf8_lossy(&buf[..i]).to_string();
559                                unsafe { buf.advance_mut(i + 1); }
560                                line
561                            };
562
563                            yield Ok(line);
564                        }
565                    },
566                    Err(e) => {
567                        yield Err(FunctionsError::from(e));
568                        break;
569                    }
570                }
571            }
572
573            // 最後の行が改行で終わっていない場合も処理
574            if !buf.is_empty() {
575                let line = String::from_utf8_lossy(&buf).to_string();
576                yield Ok(line);
577            }
578        })
579    }
580
581    /// 関数リクエストを作成する
582    pub fn create_request<T: DeserializeOwned>(
583        &self,
584        function_name: &str,
585    ) -> FunctionRequest<'_, T> {
586        FunctionRequest {
587            client: self,
588            function_name: function_name.to_string(),
589            _response_type: std::marker::PhantomData,
590        }
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597
598    #[tokio::test]
599    async fn test_invoke() {
600        // TODO: モック実装を用いたテスト
601    }
602}