supabase/
functions.rs

1//! Edge Functions module for Supabase
2//!
3//! This module provides functionality to invoke Supabase Edge Functions.
4//! Edge Functions are server-side TypeScript functions that run on the edge,
5//! close to your users for reduced latency.
6//!
7//! ## Features
8//!
9//! - **Standard Invocation**: Traditional request/response function calls
10//! - **Streaming Responses**: Server-sent events and streaming data
11//! - **Function Metadata**: Introspection and function discovery
12//! - **Local Development**: Testing utilities for local functions
13//! - **Enhanced Error Handling**: Detailed error context and retry logic
14
15use crate::{
16    error::{Error, Result},
17    types::SupabaseConfig,
18};
19use reqwest::Client as HttpClient;
20#[cfg(not(target_arch = "wasm32"))]
21use reqwest::Response;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use std::{collections::HashMap, sync::Arc, time::Duration};
25#[cfg(not(target_arch = "wasm32"))]
26use tokio_stream::Stream;
27use tracing::{debug, info, warn};
28
29// Helper for async sleep across platforms
30#[cfg(not(target_arch = "wasm32"))]
31async fn async_sleep(duration: Duration) {
32    tokio::time::sleep(duration).await;
33}
34
35#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
36async fn async_sleep(duration: Duration) {
37    use gloo_timers::future::sleep as gloo_sleep;
38    gloo_sleep(duration).await;
39}
40
41#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
42async fn async_sleep(_duration: Duration) {
43    // No-op for wasm32 without wasm feature (retry delays not supported)
44}
45
46/// Edge Functions client for invoking serverless functions
47///
48/// # Examples
49///
50/// Basic function invocation:
51///
52/// ```rust,no_run
53/// use supabase::Client;
54/// use serde_json::json;
55///
56/// # async fn example() -> supabase::Result<()> {
57/// let client = Client::new("your-project-url", "your-anon-key")?;
58///
59/// // Invoke a function with parameters
60/// let result = client.functions()
61///     .invoke("hello-world", Some(json!({"name": "World"})))
62///     .await?;
63///
64/// println!("Function result: {}", result);
65/// # Ok(())
66/// # }
67/// ```
68///
69/// Streaming function responses:
70///
71/// ```rust,no_run
72/// use supabase::Client;
73/// use serde_json::json;
74/// use tokio_stream::StreamExt;
75///
76/// # async fn example() -> supabase::Result<()> {
77/// let client = Client::new("your-project-url", "your-anon-key")?;
78///
79/// // Stream function responses
80/// let mut stream = client.functions()
81///     .invoke_stream("data-processor", Some(json!({"batch_size": 100})))
82///     .await?;
83///
84/// while let Some(chunk) = stream.next().await {
85///     match chunk {
86///         Ok(data) => println!("Received: {:?}", data),
87///         Err(e) => println!("Stream error: {}", e),
88///     }
89/// }
90/// # Ok(())
91/// # }
92/// ```
93#[derive(Debug, Clone)]
94pub struct Functions {
95    http_client: Arc<HttpClient>,
96    config: Arc<SupabaseConfig>,
97}
98
99/// Function metadata and introspection information
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct FunctionMetadata {
102    /// Function name
103    pub name: String,
104    /// Function description
105    pub description: Option<String>,
106    /// Function version
107    pub version: Option<String>,
108    /// Runtime environment
109    pub runtime: Option<String>,
110    /// Memory limit in MB
111    pub memory_limit: Option<u32>,
112    /// Timeout in seconds
113    pub timeout: Option<u32>,
114    /// Environment variables (non-sensitive)
115    pub env_vars: HashMap<String, String>,
116    /// Function status
117    pub status: FunctionStatus,
118    /// Creation timestamp
119    pub created_at: Option<String>,
120    /// Last modified timestamp
121    pub updated_at: Option<String>,
122}
123
124/// Function execution status
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "lowercase")]
127pub enum FunctionStatus {
128    /// Function is active and can be invoked
129    Active,
130    /// Function is paused/disabled
131    Inactive,
132    /// Function is deploying
133    Deploying,
134    /// Function deployment failed
135    Failed,
136}
137
138/// Configuration for function invocation
139#[derive(Debug, Clone, Default)]
140pub struct InvokeOptions {
141    /// Additional headers to send
142    pub headers: Option<HashMap<String, String>>,
143    /// Function timeout override
144    pub timeout: Option<Duration>,
145    /// Retry configuration
146    pub retry: Option<RetryConfig>,
147    /// Enable streaming response
148    pub streaming: bool,
149}
150
151/// Retry configuration for function invocation
152#[derive(Debug, Clone)]
153pub struct RetryConfig {
154    /// Maximum number of retries
155    pub max_attempts: u32,
156    /// Delay between retries
157    pub delay: Duration,
158    /// Exponential backoff multiplier
159    pub backoff_multiplier: f64,
160    /// Maximum delay between retries
161    pub max_delay: Duration,
162}
163
164impl Default for RetryConfig {
165    fn default() -> Self {
166        Self {
167            max_attempts: 3,
168            delay: Duration::from_millis(1000),
169            backoff_multiplier: 2.0,
170            max_delay: Duration::from_secs(30),
171        }
172    }
173}
174
175/// Streaming chunk from function response
176#[derive(Debug, Clone)]
177pub struct StreamChunk {
178    /// Chunk data
179    pub data: Value,
180    /// Chunk sequence number
181    pub sequence: Option<u64>,
182    /// Whether this is the last chunk
183    pub is_final: bool,
184}
185
186/// Local development configuration
187#[derive(Debug, Clone)]
188pub struct LocalConfig {
189    /// Local functions server URL
190    pub local_url: String,
191    /// Local functions directory
192    pub functions_dir: Option<String>,
193    /// Development server port
194    pub port: Option<u16>,
195}
196
197impl Functions {
198    /// Create a new Functions instance
199    pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
200        debug!("Initializing Functions module");
201
202        Ok(Self {
203            http_client,
204            config,
205        })
206    }
207
208    /// Invoke an Edge Function
209    ///
210    /// # Parameters
211    ///
212    /// * `function_name` - Name of the function to invoke
213    /// * `body` - Optional JSON body to send to the function
214    ///
215    /// # Examples
216    ///
217    /// ```rust,no_run
218    /// use serde_json::json;
219    ///
220    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
221    /// // Simple function call
222    /// let result = functions.invoke("hello", None).await?;
223    ///
224    /// // Function with parameters
225    /// let result = functions.invoke("process-data", Some(json!({
226    ///     "user_id": 123,
227    ///     "action": "update_profile"
228    /// }))).await?;
229    /// # Ok(())
230    /// # }
231    /// ```
232    pub async fn invoke(&self, function_name: &str, body: Option<Value>) -> Result<Value> {
233        self.invoke_with_options(function_name, body, None).await
234    }
235
236    /// Invoke an Edge Function with custom options
237    ///
238    /// # Parameters
239    ///
240    /// * `function_name` - Name of the function to invoke
241    /// * `body` - Optional JSON body to send to the function
242    /// * `headers` - Optional additional headers to send
243    ///
244    /// # Examples
245    ///
246    /// ```rust,no_run
247    /// use supabase::Client;
248    /// use serde_json::json;
249    /// use std::collections::HashMap;
250    ///
251    /// # async fn example() -> supabase::Result<()> {
252    /// let client = Client::new("your-project-url", "your-anon-key")?;
253    ///
254    /// let mut headers = HashMap::new();
255    /// headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
256    ///
257    /// let result = client.functions()
258    ///     .invoke_with_options("my-function", Some(json!({"data": "value"})), Some(headers))
259    ///     .await?;
260    /// # Ok(())
261    /// # }
262    /// ```
263    pub async fn invoke_with_options(
264        &self,
265        function_name: &str,
266        body: Option<Value>,
267        headers: Option<HashMap<String, String>>,
268    ) -> Result<Value> {
269        debug!("Invoking Edge Function: {}", function_name);
270
271        let url = format!("{}/functions/v1/{}", self.config.url, function_name);
272
273        let mut request = self
274            .http_client
275            .post(&url)
276            .header("Authorization", format!("Bearer {}", self.config.key))
277            .header("Content-Type", "application/json");
278
279        // Add custom headers if provided
280        if let Some(custom_headers) = headers {
281            for (key, value) in custom_headers {
282                request = request.header(key, value);
283            }
284        }
285
286        // Add body if provided
287        if let Some(body) = body {
288            request = request.json(&body);
289        }
290
291        let response = request.send().await?;
292
293        if !response.status().is_success() {
294            let status = response.status();
295            let error_msg = match response.text().await {
296                Ok(text) => {
297                    // Try to parse error message from Supabase
298                    if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
299                        if let Some(message) = error_json.get("message") {
300                            message.as_str().unwrap_or(&text).to_string()
301                        } else {
302                            text
303                        }
304                    } else {
305                        text
306                    }
307                }
308                Err(_) => format!("Function invocation failed with status: {}", status),
309            };
310            return Err(Error::functions(error_msg));
311        }
312
313        let result: Value = response.json().await?;
314        info!("Edge Function {} invoked successfully", function_name);
315
316        Ok(result)
317    }
318
319    /// Invoke an Edge Function with streaming response (native only)
320    ///
321    /// This method enables server-sent events or streaming responses from functions.
322    /// Only available on native platforms (not WASM).
323    ///
324    /// # Parameters
325    ///
326    /// * `function_name` - Name of the function to invoke
327    /// * `body` - Optional JSON body to send to the function
328    ///
329    /// # Examples
330    ///
331    /// ```rust,no_run
332    /// use serde_json::json;
333    /// use tokio_stream::StreamExt;
334    ///
335    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
336    /// let mut stream = functions.invoke_stream("streaming-function", Some(json!({
337    ///     "mode": "realtime",
338    ///     "duration": 60
339    /// }))).await?;
340    ///
341    /// while let Some(chunk) = stream.next().await {
342    ///     match chunk {
343    ///         Ok(data) => println!("Received chunk: {}", data.data),
344    ///         Err(e) => println!("Stream error: {}", e),
345    ///     }
346    /// }
347    /// # Ok(())
348    /// # }
349    /// ```
350    #[cfg(not(target_arch = "wasm32"))]
351    pub async fn invoke_stream(
352        &self,
353        function_name: &str,
354        body: Option<Value>,
355    ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
356        debug!(
357            "Starting streaming invocation of function: {}",
358            function_name
359        );
360
361        let url = format!("{}/functions/v1/{}", self.config.url, function_name);
362
363        let mut request = self
364            .http_client
365            .post(&url)
366            .header("Authorization", format!("Bearer {}", self.config.key))
367            .header("Content-Type", "application/json")
368            .header("Accept", "text/event-stream")
369            .header("Cache-Control", "no-cache");
370
371        // Add body if provided
372        if let Some(body) = body {
373            request = request.json(&body);
374        }
375
376        let response = request.send().await?;
377
378        if !response.status().is_success() {
379            let status = response.status();
380            let error_msg = response.text().await.unwrap_or_else(|_| {
381                format!(
382                    "Streaming function invocation failed with status: {}",
383                    status
384                )
385            });
386            return Err(Error::functions(error_msg));
387        }
388
389        self.process_stream(response).await
390    }
391
392    /// Get metadata for a specific function
393    ///
394    /// # Parameters
395    ///
396    /// * `function_name` - Name of the function to introspect
397    ///
398    /// # Examples
399    ///
400    /// ```rust,no_run
401    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
402    /// let metadata = functions.get_function_metadata("my-function").await?;
403    /// println!("Function: {}", metadata.name);
404    /// println!("Status: {:?}", metadata.status);
405    /// println!("Memory: {:?} MB", metadata.memory_limit);
406    /// # Ok(())
407    /// # }
408    /// ```
409    pub async fn get_function_metadata(&self, function_name: &str) -> Result<FunctionMetadata> {
410        debug!("Fetching metadata for function: {}", function_name);
411
412        let url = format!(
413            "{}/functions/v1/{}/metadata",
414            self.config.url, function_name
415        );
416
417        let response = self
418            .http_client
419            .get(&url)
420            .header("Authorization", format!("Bearer {}", self.config.key))
421            .send()
422            .await?;
423
424        if !response.status().is_success() {
425            let status = response.status();
426            let error_msg = response.text().await.unwrap_or_else(|_| {
427                format!("Failed to fetch function metadata, status: {}", status)
428            });
429            return Err(Error::functions(error_msg));
430        }
431
432        let metadata: FunctionMetadata = response.json().await?;
433        info!("Retrieved metadata for function: {}", function_name);
434
435        Ok(metadata)
436    }
437
438    /// List all available functions with their metadata
439    ///
440    /// # Examples
441    ///
442    /// ```rust,no_run
443    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
444    /// let functions_list = functions.list_functions().await?;
445    /// for func in functions_list {
446    ///     println!("Function: {} - Status: {:?}", func.name, func.status);
447    /// }
448    /// # Ok(())
449    /// # }
450    /// ```
451    pub async fn list_functions(&self) -> Result<Vec<FunctionMetadata>> {
452        debug!("Listing all available functions");
453
454        let url = format!("{}/functions/v1", self.config.url);
455
456        let response = self
457            .http_client
458            .get(&url)
459            .header("Authorization", format!("Bearer {}", self.config.key))
460            .send()
461            .await?;
462
463        if !response.status().is_success() {
464            let status = response.status();
465            let error_msg = response
466                .text()
467                .await
468                .unwrap_or_else(|_| format!("Failed to list functions, status: {}", status));
469            return Err(Error::functions(error_msg));
470        }
471
472        let functions: Vec<FunctionMetadata> = response.json().await?;
473        info!("Retrieved {} functions", functions.len());
474
475        Ok(functions)
476    }
477
478    /// Invoke a function with advanced options
479    ///
480    /// # Parameters
481    ///
482    /// * `function_name` - Name of the function to invoke
483    /// * `body` - Optional JSON body to send to the function
484    /// * `options` - Invocation options (headers, timeout, retry, etc.)
485    ///
486    /// # Examples
487    ///
488    /// ```rust,no_run
489    /// use supabase::functions::{InvokeOptions, RetryConfig};
490    /// use serde_json::json;
491    /// use std::{collections::HashMap, time::Duration};
492    ///
493    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
494    /// let mut headers = HashMap::new();
495    /// headers.insert("X-Priority".to_string(), "high".to_string());
496    ///
497    /// let options = InvokeOptions {
498    ///     headers: Some(headers),
499    ///     timeout: Some(Duration::from_secs(30)),
500    ///     retry: Some(RetryConfig::default()),
501    ///     streaming: false,
502    /// };
503    ///
504    /// let result = functions.invoke_with_advanced_options(
505    ///     "critical-function",
506    ///     Some(json!({"data": "important"})),
507    ///     options
508    /// ).await?;
509    /// # Ok(())
510    /// # }
511    /// ```
512    pub async fn invoke_with_advanced_options(
513        &self,
514        function_name: &str,
515        body: Option<Value>,
516        options: InvokeOptions,
517    ) -> Result<Value> {
518        debug!("Invoking function with advanced options: {}", function_name);
519
520        let mut attempt = 0;
521        let max_attempts = options.retry.as_ref().map(|r| r.max_attempts).unwrap_or(1);
522
523        loop {
524            attempt += 1;
525
526            match self
527                .invoke_function_once(function_name, body.clone(), &options)
528                .await
529            {
530                Ok(result) => return Ok(result),
531                Err(e) if attempt < max_attempts => {
532                    warn!("Function invocation attempt {} failed: {}", attempt, e);
533
534                    if let Some(retry_config) = &options.retry {
535                        let base_delay_ms = retry_config.delay.as_millis() as u64;
536                        let backoff_factor =
537                            retry_config.backoff_multiplier.powi(attempt as i32 - 1);
538                        let calculated_delay_ms = (base_delay_ms as f64 * backoff_factor) as u64;
539                        let max_delay_ms = retry_config.max_delay.as_millis() as u64;
540
541                        let delay_ms = std::cmp::min(calculated_delay_ms, max_delay_ms);
542                        async_sleep(Duration::from_millis(delay_ms)).await;
543                    }
544                }
545                Err(e) => return Err(e),
546            }
547        }
548    }
549
550    /// Test a function locally (for development)
551    ///
552    /// # Parameters
553    ///
554    /// * `function_name` - Name of the function to test
555    /// * `body` - Optional JSON body to send to the function
556    /// * `local_config` - Local development configuration
557    ///
558    /// # Examples
559    ///
560    /// ```rust,no_run
561    /// use supabase::functions::LocalConfig;
562    /// use serde_json::json;
563    ///
564    /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
565    /// let local_config = LocalConfig {
566    ///     local_url: "http://localhost:54321".to_string(),
567    ///     functions_dir: Some("./functions".to_string()),
568    ///     port: Some(54321),
569    /// };
570    ///
571    /// let result = functions.test_local(
572    ///     "my-function",
573    ///     Some(json!({"test": true})),
574    ///     local_config
575    /// ).await?;
576    /// # Ok(())
577    /// # }
578    /// ```
579    pub async fn test_local(
580        &self,
581        function_name: &str,
582        body: Option<Value>,
583        local_config: LocalConfig,
584    ) -> Result<Value> {
585        debug!("Testing function locally: {}", function_name);
586
587        let url = format!("{}/functions/v1/{}", local_config.local_url, function_name);
588
589        let mut request = self
590            .http_client
591            .post(&url)
592            .header("Authorization", format!("Bearer {}", self.config.key))
593            .header("Content-Type", "application/json")
594            .header("X-Local-Test", "true");
595
596        if let Some(body) = body {
597            request = request.json(&body);
598        }
599
600        let response = request.send().await?;
601
602        if !response.status().is_success() {
603            let status = response.status();
604            let error_msg = response
605                .text()
606                .await
607                .unwrap_or_else(|_| format!("Local function test failed with status: {}", status));
608            return Err(Error::functions(error_msg));
609        }
610
611        let result: Value = response.json().await?;
612        info!("Local function test completed: {}", function_name);
613
614        Ok(result)
615    }
616
617    /// Get the base Functions URL
618    pub fn functions_url(&self) -> String {
619        format!("{}/functions/v1", self.config.url)
620    }
621
622    // Private helper methods
623
624    async fn invoke_function_once(
625        &self,
626        function_name: &str,
627        body: Option<Value>,
628        options: &InvokeOptions,
629    ) -> Result<Value> {
630        let url = format!("{}/functions/v1/{}", self.config.url, function_name);
631
632        let mut request = self
633            .http_client
634            .post(&url)
635            .header("Authorization", format!("Bearer {}", self.config.key))
636            .header("Content-Type", "application/json");
637
638        // Add custom headers
639        if let Some(custom_headers) = &options.headers {
640            for (key, value) in custom_headers {
641                request = request.header(key, value);
642            }
643        }
644
645        // Set timeout
646        if let Some(timeout) = options.timeout {
647            request = request.timeout(timeout);
648        }
649
650        // Add body if provided
651        if let Some(body) = body {
652            request = request.json(&body);
653        }
654
655        let response = request.send().await?;
656
657        if !response.status().is_success() {
658            let status = response.status();
659            let error_msg = match response.text().await {
660                Ok(text) => {
661                    // Enhanced error parsing
662                    if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
663                        self.parse_function_error(&error_json)
664                    } else {
665                        text
666                    }
667                }
668                Err(_) => format!("Function invocation failed with status: {}", status),
669            };
670            return Err(Error::functions(error_msg));
671        }
672
673        let result: Value = response.json().await?;
674        Ok(result)
675    }
676
677    #[cfg(not(target_arch = "wasm32"))]
678    async fn process_stream(
679        &self,
680        response: Response,
681    ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
682        use tokio_stream::StreamExt;
683
684        // Simplified streaming - read response as text and split by lines
685        let text = response.text().await?;
686        let lines: Vec<String> = text.lines().map(|s| s.to_string()).collect();
687
688        let stream = tokio_stream::iter(lines.into_iter().map(Ok::<String, Error>));
689
690        Ok(
691            stream.map(|line_result: Result<String>| -> Result<StreamChunk> {
692                let line = line_result?;
693
694                // Parse Server-Sent Events format
695                if let Some(data_str) = line.strip_prefix("data: ") {
696                    // Remove "data: " prefix
697                    if data_str == "[DONE]" {
698                        return Ok(StreamChunk {
699                            data: Value::Null,
700                            sequence: None,
701                            is_final: true,
702                        });
703                    }
704
705                    let data: Value = serde_json::from_str(data_str).map_err(|e| {
706                        Error::functions(format!("Failed to parse stream data: {}", e))
707                    })?;
708
709                    Ok(StreamChunk {
710                        data,
711                        sequence: None,
712                        is_final: false,
713                    })
714                } else if !line.is_empty() && !line.starts_with(':') {
715                    // Skip non-data lines (event:, id:, etc.) and empty lines
716                    Ok(StreamChunk {
717                        data: Value::Null,
718                        sequence: None,
719                        is_final: false,
720                    })
721                } else {
722                    Ok(StreamChunk {
723                        data: Value::Null,
724                        sequence: None,
725                        is_final: false,
726                    })
727                }
728            }),
729        )
730    }
731
732    fn parse_function_error(&self, error_json: &Value) -> String {
733        // Enhanced error parsing for different error formats
734        if let Some(message) = error_json.get("error") {
735            if let Some(details) = message.get("message") {
736                return details.as_str().unwrap_or("Unknown error").to_string();
737            }
738            return message.as_str().unwrap_or("Unknown error").to_string();
739        }
740
741        if let Some(message) = error_json.get("message") {
742            return message.as_str().unwrap_or("Unknown error").to_string();
743        }
744
745        if let Some(details) = error_json.get("details") {
746            return details.as_str().unwrap_or("Unknown error").to_string();
747        }
748
749        "Function execution failed".to_string()
750    }
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use crate::types::{AuthConfig, DatabaseConfig, HttpConfig, StorageConfig, SupabaseConfig};
757
758    fn create_test_functions() -> Functions {
759        let config = Arc::new(SupabaseConfig {
760            url: "http://localhost:54321".to_string(),
761            key: "test-key".to_string(),
762            service_role_key: None,
763            http_config: HttpConfig::default(),
764            auth_config: AuthConfig::default(),
765            database_config: DatabaseConfig::default(),
766            storage_config: StorageConfig::default(),
767        });
768
769        let http_client = Arc::new(HttpClient::new());
770        Functions::new(config, http_client).unwrap()
771    }
772
773    #[test]
774    fn test_functions_creation() {
775        let functions = create_test_functions();
776        assert_eq!(
777            functions.functions_url(),
778            "http://localhost:54321/functions/v1"
779        );
780    }
781
782    #[test]
783    fn test_functions_url_generation() {
784        let functions = create_test_functions();
785        assert_eq!(
786            functions.functions_url(),
787            "http://localhost:54321/functions/v1"
788        );
789    }
790}