supabase/
functions.rs

1//! Edge Functions module for Supabase
2//!
3//! This module provides functionality to invoke Supabase Edge Functions.
4//! Edge Functions are server-side TypeScript functions that run on the edge,
5//! close to your users for reduced latency.
6//!
7//! ## Features
8//!
9//! - **Standard Invocation**: Traditional request/response function calls
10//! - **Streaming Responses**: Server-sent events and streaming data
11//! - **Function Metadata**: Introspection and function discovery
12//! - **Local Development**: Testing utilities for local functions
13//! - **Enhanced Error Handling**: Detailed error context and retry logic
14
15use crate::{
16    error::{Error, Result},
17    types::SupabaseConfig,
18};
19use reqwest::{Client as HttpClient, Response};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22use std::{collections::HashMap, sync::Arc, time::Duration};
23#[cfg(not(target_arch = "wasm32"))]
24use tokio_stream::Stream;
25use tracing::{debug, info, warn};
26
27/// Edge Functions client for invoking serverless functions
28///
29/// # Examples
30///
31/// Basic function invocation:
32///
33/// ```rust,no_run
34/// use supabase::Client;
35/// use serde_json::json;
36///
37/// # async fn example() -> supabase::Result<()> {
38/// let client = Client::new("your-project-url", "your-anon-key")?;
39///
40/// // Invoke a function with parameters
41/// let result = client.functions()
42///     .invoke("hello-world", Some(json!({"name": "World"})))
43///     .await?;
44///
45/// println!("Function result: {}", result);
46/// # Ok(())
47/// # }
48/// ```
49///
50/// Streaming function responses:
51///
52/// ```rust,no_run
53/// use supabase::Client;
54/// use serde_json::json;
55/// use tokio_stream::StreamExt;
56///
57/// # async fn example() -> supabase::Result<()> {
58/// let client = Client::new("your-project-url", "your-anon-key")?;
59///
60/// // Stream function responses
61/// let mut stream = client.functions()
62///     .invoke_stream("data-processor", Some(json!({"batch_size": 100})))
63///     .await?;
64///
65/// while let Some(chunk) = stream.next().await {
66///     match chunk {
67///         Ok(data) => println!("Received: {:?}", data),
68///         Err(e) => println!("Stream error: {}", e),
69///     }
70/// }
71/// # Ok(())
72/// # }
73/// ```
74#[derive(Debug, Clone)]
75pub struct Functions {
76    http_client: Arc<HttpClient>,
77    config: Arc<SupabaseConfig>,
78}
79
80/// Function metadata and introspection information
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct FunctionMetadata {
83    /// Function name
84    pub name: String,
85    /// Function description
86    pub description: Option<String>,
87    /// Function version
88    pub version: Option<String>,
89    /// Runtime environment
90    pub runtime: Option<String>,
91    /// Memory limit in MB
92    pub memory_limit: Option<u32>,
93    /// Timeout in seconds
94    pub timeout: Option<u32>,
95    /// Environment variables (non-sensitive)
96    pub env_vars: HashMap<String, String>,
97    /// Function status
98    pub status: FunctionStatus,
99    /// Creation timestamp
100    pub created_at: Option<String>,
101    /// Last modified timestamp
102    pub updated_at: Option<String>,
103}
104
105/// Function execution status
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum FunctionStatus {
109    /// Function is active and can be invoked
110    Active,
111    /// Function is paused/disabled
112    Inactive,
113    /// Function is deploying
114    Deploying,
115    /// Function deployment failed
116    Failed,
117}
118
119/// Configuration for function invocation
120#[derive(Debug, Clone, Default)]
121pub struct InvokeOptions {
122    /// Additional headers to send
123    pub headers: Option<HashMap<String, String>>,
124    /// Function timeout override
125    pub timeout: Option<Duration>,
126    /// Retry configuration
127    pub retry: Option<RetryConfig>,
128    /// Enable streaming response
129    pub streaming: bool,
130}
131
132/// Retry configuration for function invocation
133#[derive(Debug, Clone)]
134pub struct RetryConfig {
135    /// Maximum number of retries
136    pub max_attempts: u32,
137    /// Delay between retries
138    pub delay: Duration,
139    /// Exponential backoff multiplier
140    pub backoff_multiplier: f64,
141    /// Maximum delay between retries
142    pub max_delay: Duration,
143}
144
145impl Default for RetryConfig {
146    fn default() -> Self {
147        Self {
148            max_attempts: 3,
149            delay: Duration::from_millis(1000),
150            backoff_multiplier: 2.0,
151            max_delay: Duration::from_secs(30),
152        }
153    }
154}
155
156/// Streaming chunk from function response
157#[derive(Debug, Clone)]
158pub struct StreamChunk {
159    /// Chunk data
160    pub data: Value,
161    /// Chunk sequence number
162    pub sequence: Option<u64>,
163    /// Whether this is the last chunk
164    pub is_final: bool,
165}
166
167/// Local development configuration
168#[derive(Debug, Clone)]
169pub struct LocalConfig {
170    /// Local functions server URL
171    pub local_url: String,
172    /// Local functions directory
173    pub functions_dir: Option<String>,
174    /// Development server port
175    pub port: Option<u16>,
176}
177
178impl Functions {
179    /// Create a new Functions instance
180    pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
181        debug!("Initializing Functions module");
182
183        Ok(Self {
184            http_client,
185            config,
186        })
187    }
188
189    /// Invoke an Edge Function
190    ///
191    /// # Parameters
192    ///
193    /// * `function_name` - Name of the function to invoke
194    /// * `body` - Optional JSON body to send to the function
195    ///
196    /// # Examples
197    ///
198    /// ```rust,no_run
199    /// use serde_json::json;
200    ///
201    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
202    /// // Simple function call
203    /// let result = functions.invoke("hello", None).await?;
204    ///
205    /// // Function with parameters
206    /// let result = functions.invoke("process-data", Some(json!({
207    ///     "user_id": 123,
208    ///     "action": "update_profile"
209    /// }))).await?;
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub async fn invoke(&self, function_name: &str, body: Option<Value>) -> Result<Value> {
214        self.invoke_with_options(function_name, body, None).await
215    }
216
217    /// Invoke an Edge Function with custom options
218    ///
219    /// # Parameters
220    ///
221    /// * `function_name` - Name of the function to invoke
222    /// * `body` - Optional JSON body to send to the function
223    /// * `headers` - Optional additional headers to send
224    ///
225    /// # Examples
226    ///
227    /// ```rust,no_run
228    /// use supabase::Client;
229    /// use serde_json::json;
230    /// use std::collections::HashMap;
231    ///
232    /// # async fn example() -> supabase::Result<()> {
233    /// let client = Client::new("your-project-url", "your-anon-key")?;
234    ///
235    /// let mut headers = HashMap::new();
236    /// headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
237    ///
238    /// let result = client.functions()
239    ///     .invoke_with_options("my-function", Some(json!({"data": "value"})), Some(headers))
240    ///     .await?;
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn invoke_with_options(
245        &self,
246        function_name: &str,
247        body: Option<Value>,
248        headers: Option<HashMap<String, String>>,
249    ) -> Result<Value> {
250        debug!("Invoking Edge Function: {}", function_name);
251
252        let url = format!("{}/functions/v1/{}", self.config.url, function_name);
253
254        let mut request = self
255            .http_client
256            .post(&url)
257            .header("Authorization", format!("Bearer {}", self.config.key))
258            .header("Content-Type", "application/json");
259
260        // Add custom headers if provided
261        if let Some(custom_headers) = headers {
262            for (key, value) in custom_headers {
263                request = request.header(key, value);
264            }
265        }
266
267        // Add body if provided
268        if let Some(body) = body {
269            request = request.json(&body);
270        }
271
272        let response = request.send().await?;
273
274        if !response.status().is_success() {
275            let status = response.status();
276            let error_msg = match response.text().await {
277                Ok(text) => {
278                    // Try to parse error message from Supabase
279                    if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
280                        if let Some(message) = error_json.get("message") {
281                            message.as_str().unwrap_or(&text).to_string()
282                        } else {
283                            text
284                        }
285                    } else {
286                        text
287                    }
288                }
289                Err(_) => format!("Function invocation failed with status: {}", status),
290            };
291            return Err(Error::functions(error_msg));
292        }
293
294        let result: Value = response.json().await?;
295        info!("Edge Function {} invoked successfully", function_name);
296
297        Ok(result)
298    }
299
300    /// Invoke an Edge Function with streaming response (native only)
301    ///
302    /// This method enables server-sent events or streaming responses from functions.
303    /// Only available on native platforms (not WASM).
304    ///
305    /// # Parameters
306    ///
307    /// * `function_name` - Name of the function to invoke
308    /// * `body` - Optional JSON body to send to the function
309    ///
310    /// # Examples
311    ///
312    /// ```rust,no_run
313    /// use serde_json::json;
314    /// use tokio_stream::StreamExt;
315    ///
316    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
317    /// let mut stream = functions.invoke_stream("streaming-function", Some(json!({
318    ///     "mode": "realtime",
319    ///     "duration": 60
320    /// }))).await?;
321    ///
322    /// while let Some(chunk) = stream.next().await {
323    ///     match chunk {
324    ///         Ok(data) => println!("Received chunk: {}", data.data),
325    ///         Err(e) => println!("Stream error: {}", e),
326    ///     }
327    /// }
328    /// # Ok(())
329    /// # }
330    /// ```
331    #[cfg(not(target_arch = "wasm32"))]
332    pub async fn invoke_stream(
333        &self,
334        function_name: &str,
335        body: Option<Value>,
336    ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
337        debug!(
338            "Starting streaming invocation of function: {}",
339            function_name
340        );
341
342        let url = format!("{}/functions/v1/{}", self.config.url, function_name);
343
344        let mut request = self
345            .http_client
346            .post(&url)
347            .header("Authorization", format!("Bearer {}", self.config.key))
348            .header("Content-Type", "application/json")
349            .header("Accept", "text/event-stream")
350            .header("Cache-Control", "no-cache");
351
352        // Add body if provided
353        if let Some(body) = body {
354            request = request.json(&body);
355        }
356
357        let response = request.send().await?;
358
359        if !response.status().is_success() {
360            let status = response.status();
361            let error_msg = response.text().await.unwrap_or_else(|_| {
362                format!(
363                    "Streaming function invocation failed with status: {}",
364                    status
365                )
366            });
367            return Err(Error::functions(error_msg));
368        }
369
370        self.process_stream(response).await
371    }
372
373    /// Get metadata for a specific function
374    ///
375    /// # Parameters
376    ///
377    /// * `function_name` - Name of the function to introspect
378    ///
379    /// # Examples
380    ///
381    /// ```rust,no_run
382    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
383    /// let metadata = functions.get_function_metadata("my-function").await?;
384    /// println!("Function: {}", metadata.name);
385    /// println!("Status: {:?}", metadata.status);
386    /// println!("Memory: {:?} MB", metadata.memory_limit);
387    /// # Ok(())
388    /// # }
389    /// ```
390    pub async fn get_function_metadata(&self, function_name: &str) -> Result<FunctionMetadata> {
391        debug!("Fetching metadata for function: {}", function_name);
392
393        let url = format!(
394            "{}/functions/v1/{}/metadata",
395            self.config.url, function_name
396        );
397
398        let response = self
399            .http_client
400            .get(&url)
401            .header("Authorization", format!("Bearer {}", self.config.key))
402            .send()
403            .await?;
404
405        if !response.status().is_success() {
406            let status = response.status();
407            let error_msg = response.text().await.unwrap_or_else(|_| {
408                format!("Failed to fetch function metadata, status: {}", status)
409            });
410            return Err(Error::functions(error_msg));
411        }
412
413        let metadata: FunctionMetadata = response.json().await?;
414        info!("Retrieved metadata for function: {}", function_name);
415
416        Ok(metadata)
417    }
418
419    /// List all available functions with their metadata
420    ///
421    /// # Examples
422    ///
423    /// ```rust,no_run
424    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
425    /// let functions_list = functions.list_functions().await?;
426    /// for func in functions_list {
427    ///     println!("Function: {} - Status: {:?}", func.name, func.status);
428    /// }
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub async fn list_functions(&self) -> Result<Vec<FunctionMetadata>> {
433        debug!("Listing all available functions");
434
435        let url = format!("{}/functions/v1", self.config.url);
436
437        let response = self
438            .http_client
439            .get(&url)
440            .header("Authorization", format!("Bearer {}", self.config.key))
441            .send()
442            .await?;
443
444        if !response.status().is_success() {
445            let status = response.status();
446            let error_msg = response
447                .text()
448                .await
449                .unwrap_or_else(|_| format!("Failed to list functions, status: {}", status));
450            return Err(Error::functions(error_msg));
451        }
452
453        let functions: Vec<FunctionMetadata> = response.json().await?;
454        info!("Retrieved {} functions", functions.len());
455
456        Ok(functions)
457    }
458
459    /// Invoke a function with advanced options
460    ///
461    /// # Parameters
462    ///
463    /// * `function_name` - Name of the function to invoke
464    /// * `body` - Optional JSON body to send to the function
465    /// * `options` - Invocation options (headers, timeout, retry, etc.)
466    ///
467    /// # Examples
468    ///
469    /// ```rust,no_run
470    /// use supabase::functions::{InvokeOptions, RetryConfig};
471    /// use serde_json::json;
472    /// use std::{collections::HashMap, time::Duration};
473    ///
474    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
475    /// let mut headers = HashMap::new();
476    /// headers.insert("X-Priority".to_string(), "high".to_string());
477    ///
478    /// let options = InvokeOptions {
479    ///     headers: Some(headers),
480    ///     timeout: Some(Duration::from_secs(30)),
481    ///     retry: Some(RetryConfig::default()),
482    ///     streaming: false,
483    /// };
484    ///
485    /// let result = functions.invoke_with_advanced_options(
486    ///     "critical-function",
487    ///     Some(json!({"data": "important"})),
488    ///     options
489    /// ).await?;
490    /// # Ok(())
491    /// # }
492    /// ```
493    pub async fn invoke_with_advanced_options(
494        &self,
495        function_name: &str,
496        body: Option<Value>,
497        options: InvokeOptions,
498    ) -> Result<Value> {
499        debug!("Invoking function with advanced options: {}", function_name);
500
501        let mut attempt = 0;
502        let max_attempts = options.retry.as_ref().map(|r| r.max_attempts).unwrap_or(1);
503
504        loop {
505            attempt += 1;
506
507            match self
508                .invoke_function_once(function_name, body.clone(), &options)
509                .await
510            {
511                Ok(result) => return Ok(result),
512                Err(e) if attempt < max_attempts => {
513                    warn!("Function invocation attempt {} failed: {}", attempt, e);
514
515                    if let Some(retry_config) = &options.retry {
516                        let base_delay_ms = retry_config.delay.as_millis() as u64;
517                        let backoff_factor =
518                            retry_config.backoff_multiplier.powi(attempt as i32 - 1);
519                        let calculated_delay_ms = (base_delay_ms as f64 * backoff_factor) as u64;
520                        let max_delay_ms = retry_config.max_delay.as_millis() as u64;
521
522                        let delay_ms = std::cmp::min(calculated_delay_ms, max_delay_ms);
523                        tokio::time::sleep(Duration::from_millis(delay_ms)).await;
524                    }
525                }
526                Err(e) => return Err(e),
527            }
528        }
529    }
530
531    /// Test a function locally (for development)
532    ///
533    /// # Parameters
534    ///
535    /// * `function_name` - Name of the function to test
536    /// * `body` - Optional JSON body to send to the function
537    /// * `local_config` - Local development configuration
538    ///
539    /// # Examples
540    ///
541    /// ```rust,no_run
542    /// use supabase::functions::LocalConfig;
543    /// use serde_json::json;
544    ///
545    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
546    /// let local_config = LocalConfig {
547    ///     local_url: "http://localhost:54321".to_string(),
548    ///     functions_dir: Some("./functions".to_string()),
549    ///     port: Some(54321),
550    /// };
551    ///
552    /// let result = functions.test_local(
553    ///     "my-function",
554    ///     Some(json!({"test": true})),
555    ///     local_config
556    /// ).await?;
557    /// # Ok(())
558    /// # }
559    /// ```
560    pub async fn test_local(
561        &self,
562        function_name: &str,
563        body: Option<Value>,
564        local_config: LocalConfig,
565    ) -> Result<Value> {
566        debug!("Testing function locally: {}", function_name);
567
568        let url = format!("{}/functions/v1/{}", local_config.local_url, function_name);
569
570        let mut request = self
571            .http_client
572            .post(&url)
573            .header("Authorization", format!("Bearer {}", self.config.key))
574            .header("Content-Type", "application/json")
575            .header("X-Local-Test", "true");
576
577        if let Some(body) = body {
578            request = request.json(&body);
579        }
580
581        let response = request.send().await?;
582
583        if !response.status().is_success() {
584            let status = response.status();
585            let error_msg = response
586                .text()
587                .await
588                .unwrap_or_else(|_| format!("Local function test failed with status: {}", status));
589            return Err(Error::functions(error_msg));
590        }
591
592        let result: Value = response.json().await?;
593        info!("Local function test completed: {}", function_name);
594
595        Ok(result)
596    }
597
598    /// Get the base Functions URL
599    pub fn functions_url(&self) -> String {
600        format!("{}/functions/v1", self.config.url)
601    }
602
603    // Private helper methods
604
605    async fn invoke_function_once(
606        &self,
607        function_name: &str,
608        body: Option<Value>,
609        options: &InvokeOptions,
610    ) -> Result<Value> {
611        let url = format!("{}/functions/v1/{}", self.config.url, function_name);
612
613        let mut request = self
614            .http_client
615            .post(&url)
616            .header("Authorization", format!("Bearer {}", self.config.key))
617            .header("Content-Type", "application/json");
618
619        // Add custom headers
620        if let Some(custom_headers) = &options.headers {
621            for (key, value) in custom_headers {
622                request = request.header(key, value);
623            }
624        }
625
626        // Set timeout
627        if let Some(timeout) = options.timeout {
628            request = request.timeout(timeout);
629        }
630
631        // Add body if provided
632        if let Some(body) = body {
633            request = request.json(&body);
634        }
635
636        let response = request.send().await?;
637
638        if !response.status().is_success() {
639            let status = response.status();
640            let error_msg = match response.text().await {
641                Ok(text) => {
642                    // Enhanced error parsing
643                    if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
644                        self.parse_function_error(&error_json)
645                    } else {
646                        text
647                    }
648                }
649                Err(_) => format!("Function invocation failed with status: {}", status),
650            };
651            return Err(Error::functions(error_msg));
652        }
653
654        let result: Value = response.json().await?;
655        Ok(result)
656    }
657
658    #[cfg(not(target_arch = "wasm32"))]
659    async fn process_stream(
660        &self,
661        response: Response,
662    ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
663        use tokio_stream::StreamExt;
664
665        // Simplified streaming - read response as text and split by lines
666        let text = response.text().await?;
667        let lines: Vec<String> = text.lines().map(|s| s.to_string()).collect();
668
669        let stream = tokio_stream::iter(lines.into_iter().map(Ok::<String, Error>));
670
671        Ok(
672            stream.map(|line_result: Result<String>| -> Result<StreamChunk> {
673                let line = line_result?;
674
675                // Parse Server-Sent Events format
676                if let Some(data_str) = line.strip_prefix("data: ") {
677                    // Remove "data: " prefix
678                    if data_str == "[DONE]" {
679                        return Ok(StreamChunk {
680                            data: Value::Null,
681                            sequence: None,
682                            is_final: true,
683                        });
684                    }
685
686                    let data: Value = serde_json::from_str(data_str).map_err(|e| {
687                        Error::functions(format!("Failed to parse stream data: {}", e))
688                    })?;
689
690                    Ok(StreamChunk {
691                        data,
692                        sequence: None,
693                        is_final: false,
694                    })
695                } else if !line.is_empty() && !line.starts_with(':') {
696                    // Skip non-data lines (event:, id:, etc.) and empty lines
697                    Ok(StreamChunk {
698                        data: Value::Null,
699                        sequence: None,
700                        is_final: false,
701                    })
702                } else {
703                    Ok(StreamChunk {
704                        data: Value::Null,
705                        sequence: None,
706                        is_final: false,
707                    })
708                }
709            }),
710        )
711    }
712
713    fn parse_function_error(&self, error_json: &Value) -> String {
714        // Enhanced error parsing for different error formats
715        if let Some(message) = error_json.get("error") {
716            if let Some(details) = message.get("message") {
717                return details.as_str().unwrap_or("Unknown error").to_string();
718            }
719            return message.as_str().unwrap_or("Unknown error").to_string();
720        }
721
722        if let Some(message) = error_json.get("message") {
723            return message.as_str().unwrap_or("Unknown error").to_string();
724        }
725
726        if let Some(details) = error_json.get("details") {
727            return details.as_str().unwrap_or("Unknown error").to_string();
728        }
729
730        "Function execution failed".to_string()
731    }
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737    use crate::types::{AuthConfig, DatabaseConfig, HttpConfig, StorageConfig, SupabaseConfig};
738
739    fn create_test_functions() -> Functions {
740        let config = Arc::new(SupabaseConfig {
741            url: "http://localhost:54321".to_string(),
742            key: "test-key".to_string(),
743            service_role_key: None,
744            http_config: HttpConfig::default(),
745            auth_config: AuthConfig::default(),
746            database_config: DatabaseConfig::default(),
747            storage_config: StorageConfig::default(),
748        });
749
750        let http_client = Arc::new(HttpClient::new());
751        Functions::new(config, http_client).unwrap()
752    }
753
754    #[test]
755    fn test_functions_creation() {
756        let functions = create_test_functions();
757        assert_eq!(
758            functions.functions_url(),
759            "http://localhost:54321/functions/v1"
760        );
761    }
762
763    #[test]
764    fn test_functions_url_generation() {
765        let functions = create_test_functions();
766        assert_eq!(
767            functions.functions_url(),
768            "http://localhost:54321/functions/v1"
769        );
770    }
771}