supabase/functions.rs
1//! Edge Functions module for Supabase
2//!
3//! This module provides functionality to invoke Supabase Edge Functions.
4//! Edge Functions are server-side TypeScript functions that run on the edge,
5//! close to your users for reduced latency.
6//!
7//! ## Features
8//!
9//! - **Standard Invocation**: Traditional request/response function calls
10//! - **Streaming Responses**: Server-sent events and streaming data
11//! - **Function Metadata**: Introspection and function discovery
12//! - **Local Development**: Testing utilities for local functions
13//! - **Enhanced Error Handling**: Detailed error context and retry logic
14
15use crate::{
16 error::{Error, Result},
17 types::SupabaseConfig,
18};
19use reqwest::Client as HttpClient;
20#[cfg(not(target_arch = "wasm32"))]
21use reqwest::Response;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use std::{collections::HashMap, sync::Arc, time::Duration};
25#[cfg(not(target_arch = "wasm32"))]
26use tokio_stream::Stream;
27use tracing::{debug, info, warn};
28
29// Helper for async sleep across platforms
30#[cfg(not(target_arch = "wasm32"))]
31async fn async_sleep(duration: Duration) {
32 tokio::time::sleep(duration).await;
33}
34
35#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
36async fn async_sleep(duration: Duration) {
37 use gloo_timers::future::sleep as gloo_sleep;
38 gloo_sleep(duration).await;
39}
40
41#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
42async fn async_sleep(_duration: Duration) {
43 // No-op for wasm32 without wasm feature (retry delays not supported)
44}
45
46/// Edge Functions client for invoking serverless functions
47///
48/// # Examples
49///
50/// Basic function invocation:
51///
52/// ```rust,no_run
53/// use supabase::Client;
54/// use serde_json::json;
55///
56/// # async fn example() -> supabase::Result<()> {
57/// let client = Client::new("your-project-url", "your-anon-key")?;
58///
59/// // Invoke a function with parameters
60/// let result = client.functions()
61/// .invoke("hello-world", Some(json!({"name": "World"})))
62/// .await?;
63///
64/// println!("Function result: {}", result);
65/// # Ok(())
66/// # }
67/// ```
68///
69/// Streaming function responses:
70///
71/// ```rust,no_run
72/// use supabase::Client;
73/// use serde_json::json;
74/// use tokio_stream::StreamExt;
75///
76/// # async fn example() -> supabase::Result<()> {
77/// let client = Client::new("your-project-url", "your-anon-key")?;
78///
79/// // Stream function responses
80/// let mut stream = client.functions()
81/// .invoke_stream("data-processor", Some(json!({"batch_size": 100})))
82/// .await?;
83///
84/// while let Some(chunk) = stream.next().await {
85/// match chunk {
86/// Ok(data) => println!("Received: {:?}", data),
87/// Err(e) => println!("Stream error: {}", e),
88/// }
89/// }
90/// # Ok(())
91/// # }
92/// ```
93#[derive(Debug, Clone)]
94pub struct Functions {
95 http_client: Arc<HttpClient>,
96 config: Arc<SupabaseConfig>,
97}
98
99/// Function metadata and introspection information
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct FunctionMetadata {
102 /// Function name
103 pub name: String,
104 /// Function description
105 pub description: Option<String>,
106 /// Function version
107 pub version: Option<String>,
108 /// Runtime environment
109 pub runtime: Option<String>,
110 /// Memory limit in MB
111 pub memory_limit: Option<u32>,
112 /// Timeout in seconds
113 pub timeout: Option<u32>,
114 /// Environment variables (non-sensitive)
115 pub env_vars: HashMap<String, String>,
116 /// Function status
117 pub status: FunctionStatus,
118 /// Creation timestamp
119 pub created_at: Option<String>,
120 /// Last modified timestamp
121 pub updated_at: Option<String>,
122}
123
124/// Function execution status
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "lowercase")]
127pub enum FunctionStatus {
128 /// Function is active and can be invoked
129 Active,
130 /// Function is paused/disabled
131 Inactive,
132 /// Function is deploying
133 Deploying,
134 /// Function deployment failed
135 Failed,
136}
137
138/// Configuration for function invocation
139#[derive(Debug, Clone, Default)]
140pub struct InvokeOptions {
141 /// Additional headers to send
142 pub headers: Option<HashMap<String, String>>,
143 /// Function timeout override
144 pub timeout: Option<Duration>,
145 /// Retry configuration
146 pub retry: Option<RetryConfig>,
147 /// Enable streaming response
148 pub streaming: bool,
149}
150
151/// Retry configuration for function invocation
152#[derive(Debug, Clone)]
153pub struct RetryConfig {
154 /// Maximum number of retries
155 pub max_attempts: u32,
156 /// Delay between retries
157 pub delay: Duration,
158 /// Exponential backoff multiplier
159 pub backoff_multiplier: f64,
160 /// Maximum delay between retries
161 pub max_delay: Duration,
162}
163
164impl Default for RetryConfig {
165 fn default() -> Self {
166 Self {
167 max_attempts: 3,
168 delay: Duration::from_millis(1000),
169 backoff_multiplier: 2.0,
170 max_delay: Duration::from_secs(30),
171 }
172 }
173}
174
175/// Streaming chunk from function response
176#[derive(Debug, Clone)]
177pub struct StreamChunk {
178 /// Chunk data
179 pub data: Value,
180 /// Chunk sequence number
181 pub sequence: Option<u64>,
182 /// Whether this is the last chunk
183 pub is_final: bool,
184}
185
186/// Local development configuration
187#[derive(Debug, Clone)]
188pub struct LocalConfig {
189 /// Local functions server URL
190 pub local_url: String,
191 /// Local functions directory
192 pub functions_dir: Option<String>,
193 /// Development server port
194 pub port: Option<u16>,
195}
196
197impl Functions {
198 /// Create a new Functions instance
199 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
200 debug!("Initializing Functions module");
201
202 Ok(Self {
203 http_client,
204 config,
205 })
206 }
207
208 /// Invoke an Edge Function
209 ///
210 /// # Parameters
211 ///
212 /// * `function_name` - Name of the function to invoke
213 /// * `body` - Optional JSON body to send to the function
214 ///
215 /// # Examples
216 ///
217 /// ```rust,no_run
218 /// use serde_json::json;
219 ///
220 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
221 /// // Simple function call
222 /// let result = functions.invoke("hello", None).await?;
223 ///
224 /// // Function with parameters
225 /// let result = functions.invoke("process-data", Some(json!({
226 /// "user_id": 123,
227 /// "action": "update_profile"
228 /// }))).await?;
229 /// # Ok(())
230 /// # }
231 /// ```
232 pub async fn invoke(&self, function_name: &str, body: Option<Value>) -> Result<Value> {
233 self.invoke_with_options(function_name, body, None).await
234 }
235
236 /// Invoke an Edge Function with custom options
237 ///
238 /// # Parameters
239 ///
240 /// * `function_name` - Name of the function to invoke
241 /// * `body` - Optional JSON body to send to the function
242 /// * `headers` - Optional additional headers to send
243 ///
244 /// # Examples
245 ///
246 /// ```rust,no_run
247 /// use supabase::Client;
248 /// use serde_json::json;
249 /// use std::collections::HashMap;
250 ///
251 /// # async fn example() -> supabase::Result<()> {
252 /// let client = Client::new("your-project-url", "your-anon-key")?;
253 ///
254 /// let mut headers = HashMap::new();
255 /// headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
256 ///
257 /// let result = client.functions()
258 /// .invoke_with_options("my-function", Some(json!({"data": "value"})), Some(headers))
259 /// .await?;
260 /// # Ok(())
261 /// # }
262 /// ```
263 pub async fn invoke_with_options(
264 &self,
265 function_name: &str,
266 body: Option<Value>,
267 headers: Option<HashMap<String, String>>,
268 ) -> Result<Value> {
269 debug!("Invoking Edge Function: {}", function_name);
270
271 let url = format!("{}/functions/v1/{}", self.config.url, function_name);
272
273 let mut request = self
274 .http_client
275 .post(&url)
276 .header("Authorization", format!("Bearer {}", self.config.key))
277 .header("Content-Type", "application/json");
278
279 // Add custom headers if provided
280 if let Some(custom_headers) = headers {
281 for (key, value) in custom_headers {
282 request = request.header(key, value);
283 }
284 }
285
286 // Add body if provided
287 if let Some(body) = body {
288 request = request.json(&body);
289 }
290
291 let response = request.send().await?;
292
293 if !response.status().is_success() {
294 let status = response.status();
295 let error_msg = match response.text().await {
296 Ok(text) => {
297 // Try to parse error message from Supabase
298 if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
299 if let Some(message) = error_json.get("message") {
300 message.as_str().unwrap_or(&text).to_string()
301 } else {
302 text
303 }
304 } else {
305 text
306 }
307 }
308 Err(_) => format!("Function invocation failed with status: {}", status),
309 };
310 return Err(Error::functions(error_msg));
311 }
312
313 let result: Value = response.json().await?;
314 info!("Edge Function {} invoked successfully", function_name);
315
316 Ok(result)
317 }
318
319 /// Invoke an Edge Function with streaming response (native only)
320 ///
321 /// This method enables server-sent events or streaming responses from functions.
322 /// Only available on native platforms (not WASM).
323 ///
324 /// # Parameters
325 ///
326 /// * `function_name` - Name of the function to invoke
327 /// * `body` - Optional JSON body to send to the function
328 ///
329 /// # Examples
330 ///
331 /// ```rust,no_run
332 /// use serde_json::json;
333 /// use tokio_stream::StreamExt;
334 ///
335 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
336 /// let mut stream = functions.invoke_stream("streaming-function", Some(json!({
337 /// "mode": "realtime",
338 /// "duration": 60
339 /// }))).await?;
340 ///
341 /// while let Some(chunk) = stream.next().await {
342 /// match chunk {
343 /// Ok(data) => println!("Received chunk: {}", data.data),
344 /// Err(e) => println!("Stream error: {}", e),
345 /// }
346 /// }
347 /// # Ok(())
348 /// # }
349 /// ```
350 #[cfg(not(target_arch = "wasm32"))]
351 pub async fn invoke_stream(
352 &self,
353 function_name: &str,
354 body: Option<Value>,
355 ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
356 debug!(
357 "Starting streaming invocation of function: {}",
358 function_name
359 );
360
361 let url = format!("{}/functions/v1/{}", self.config.url, function_name);
362
363 let mut request = self
364 .http_client
365 .post(&url)
366 .header("Authorization", format!("Bearer {}", self.config.key))
367 .header("Content-Type", "application/json")
368 .header("Accept", "text/event-stream")
369 .header("Cache-Control", "no-cache");
370
371 // Add body if provided
372 if let Some(body) = body {
373 request = request.json(&body);
374 }
375
376 let response = request.send().await?;
377
378 if !response.status().is_success() {
379 let status = response.status();
380 let error_msg = response.text().await.unwrap_or_else(|_| {
381 format!(
382 "Streaming function invocation failed with status: {}",
383 status
384 )
385 });
386 return Err(Error::functions(error_msg));
387 }
388
389 self.process_stream(response).await
390 }
391
392 /// Get metadata for a specific function
393 ///
394 /// # Parameters
395 ///
396 /// * `function_name` - Name of the function to introspect
397 ///
398 /// # Examples
399 ///
400 /// ```rust,no_run
401 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
402 /// let metadata = functions.get_function_metadata("my-function").await?;
403 /// println!("Function: {}", metadata.name);
404 /// println!("Status: {:?}", metadata.status);
405 /// println!("Memory: {:?} MB", metadata.memory_limit);
406 /// # Ok(())
407 /// # }
408 /// ```
409 pub async fn get_function_metadata(&self, function_name: &str) -> Result<FunctionMetadata> {
410 debug!("Fetching metadata for function: {}", function_name);
411
412 let url = format!(
413 "{}/functions/v1/{}/metadata",
414 self.config.url, function_name
415 );
416
417 let response = self
418 .http_client
419 .get(&url)
420 .header("Authorization", format!("Bearer {}", self.config.key))
421 .send()
422 .await?;
423
424 if !response.status().is_success() {
425 let status = response.status();
426 let error_msg = response.text().await.unwrap_or_else(|_| {
427 format!("Failed to fetch function metadata, status: {}", status)
428 });
429 return Err(Error::functions(error_msg));
430 }
431
432 let metadata: FunctionMetadata = response.json().await?;
433 info!("Retrieved metadata for function: {}", function_name);
434
435 Ok(metadata)
436 }
437
438 /// List all available functions with their metadata
439 ///
440 /// # Examples
441 ///
442 /// ```rust,no_run
443 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
444 /// let functions_list = functions.list_functions().await?;
445 /// for func in functions_list {
446 /// println!("Function: {} - Status: {:?}", func.name, func.status);
447 /// }
448 /// # Ok(())
449 /// # }
450 /// ```
451 pub async fn list_functions(&self) -> Result<Vec<FunctionMetadata>> {
452 debug!("Listing all available functions");
453
454 let url = format!("{}/functions/v1", self.config.url);
455
456 let response = self
457 .http_client
458 .get(&url)
459 .header("Authorization", format!("Bearer {}", self.config.key))
460 .send()
461 .await?;
462
463 if !response.status().is_success() {
464 let status = response.status();
465 let error_msg = response
466 .text()
467 .await
468 .unwrap_or_else(|_| format!("Failed to list functions, status: {}", status));
469 return Err(Error::functions(error_msg));
470 }
471
472 let functions: Vec<FunctionMetadata> = response.json().await?;
473 info!("Retrieved {} functions", functions.len());
474
475 Ok(functions)
476 }
477
478 /// Invoke a function with advanced options
479 ///
480 /// # Parameters
481 ///
482 /// * `function_name` - Name of the function to invoke
483 /// * `body` - Optional JSON body to send to the function
484 /// * `options` - Invocation options (headers, timeout, retry, etc.)
485 ///
486 /// # Examples
487 ///
488 /// ```rust,no_run
489 /// use supabase::functions::{InvokeOptions, RetryConfig};
490 /// use serde_json::json;
491 /// use std::{collections::HashMap, time::Duration};
492 ///
493 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
494 /// let mut headers = HashMap::new();
495 /// headers.insert("X-Priority".to_string(), "high".to_string());
496 ///
497 /// let options = InvokeOptions {
498 /// headers: Some(headers),
499 /// timeout: Some(Duration::from_secs(30)),
500 /// retry: Some(RetryConfig::default()),
501 /// streaming: false,
502 /// };
503 ///
504 /// let result = functions.invoke_with_advanced_options(
505 /// "critical-function",
506 /// Some(json!({"data": "important"})),
507 /// options
508 /// ).await?;
509 /// # Ok(())
510 /// # }
511 /// ```
512 pub async fn invoke_with_advanced_options(
513 &self,
514 function_name: &str,
515 body: Option<Value>,
516 options: InvokeOptions,
517 ) -> Result<Value> {
518 debug!("Invoking function with advanced options: {}", function_name);
519
520 let mut attempt = 0;
521 let max_attempts = options.retry.as_ref().map(|r| r.max_attempts).unwrap_or(1);
522
523 loop {
524 attempt += 1;
525
526 match self
527 .invoke_function_once(function_name, body.clone(), &options)
528 .await
529 {
530 Ok(result) => return Ok(result),
531 Err(e) if attempt < max_attempts => {
532 warn!("Function invocation attempt {} failed: {}", attempt, e);
533
534 if let Some(retry_config) = &options.retry {
535 let base_delay_ms = retry_config.delay.as_millis() as u64;
536 let backoff_factor =
537 retry_config.backoff_multiplier.powi(attempt as i32 - 1);
538 let calculated_delay_ms = (base_delay_ms as f64 * backoff_factor) as u64;
539 let max_delay_ms = retry_config.max_delay.as_millis() as u64;
540
541 let delay_ms = std::cmp::min(calculated_delay_ms, max_delay_ms);
542 async_sleep(Duration::from_millis(delay_ms)).await;
543 }
544 }
545 Err(e) => return Err(e),
546 }
547 }
548 }
549
550 /// Test a function locally (for development)
551 ///
552 /// # Parameters
553 ///
554 /// * `function_name` - Name of the function to test
555 /// * `body` - Optional JSON body to send to the function
556 /// * `local_config` - Local development configuration
557 ///
558 /// # Examples
559 ///
560 /// ```rust,no_run
561 /// use supabase::functions::LocalConfig;
562 /// use serde_json::json;
563 ///
564 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
565 /// let local_config = LocalConfig {
566 /// local_url: "http://localhost:54321".to_string(),
567 /// functions_dir: Some("./functions".to_string()),
568 /// port: Some(54321),
569 /// };
570 ///
571 /// let result = functions.test_local(
572 /// "my-function",
573 /// Some(json!({"test": true})),
574 /// local_config
575 /// ).await?;
576 /// # Ok(())
577 /// # }
578 /// ```
579 pub async fn test_local(
580 &self,
581 function_name: &str,
582 body: Option<Value>,
583 local_config: LocalConfig,
584 ) -> Result<Value> {
585 debug!("Testing function locally: {}", function_name);
586
587 let url = format!("{}/functions/v1/{}", local_config.local_url, function_name);
588
589 let mut request = self
590 .http_client
591 .post(&url)
592 .header("Authorization", format!("Bearer {}", self.config.key))
593 .header("Content-Type", "application/json")
594 .header("X-Local-Test", "true");
595
596 if let Some(body) = body {
597 request = request.json(&body);
598 }
599
600 let response = request.send().await?;
601
602 if !response.status().is_success() {
603 let status = response.status();
604 let error_msg = response
605 .text()
606 .await
607 .unwrap_or_else(|_| format!("Local function test failed with status: {}", status));
608 return Err(Error::functions(error_msg));
609 }
610
611 let result: Value = response.json().await?;
612 info!("Local function test completed: {}", function_name);
613
614 Ok(result)
615 }
616
617 /// Get the base Functions URL
618 pub fn functions_url(&self) -> String {
619 format!("{}/functions/v1", self.config.url)
620 }
621
622 // Private helper methods
623
624 async fn invoke_function_once(
625 &self,
626 function_name: &str,
627 body: Option<Value>,
628 options: &InvokeOptions,
629 ) -> Result<Value> {
630 let url = format!("{}/functions/v1/{}", self.config.url, function_name);
631
632 let mut request = self
633 .http_client
634 .post(&url)
635 .header("Authorization", format!("Bearer {}", self.config.key))
636 .header("Content-Type", "application/json");
637
638 // Add custom headers
639 if let Some(custom_headers) = &options.headers {
640 for (key, value) in custom_headers {
641 request = request.header(key, value);
642 }
643 }
644
645 // Set timeout
646 if let Some(timeout) = options.timeout {
647 request = request.timeout(timeout);
648 }
649
650 // Add body if provided
651 if let Some(body) = body {
652 request = request.json(&body);
653 }
654
655 let response = request.send().await?;
656
657 if !response.status().is_success() {
658 let status = response.status();
659 let error_msg = match response.text().await {
660 Ok(text) => {
661 // Enhanced error parsing
662 if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
663 self.parse_function_error(&error_json)
664 } else {
665 text
666 }
667 }
668 Err(_) => format!("Function invocation failed with status: {}", status),
669 };
670 return Err(Error::functions(error_msg));
671 }
672
673 let result: Value = response.json().await?;
674 Ok(result)
675 }
676
677 #[cfg(not(target_arch = "wasm32"))]
678 async fn process_stream(
679 &self,
680 response: Response,
681 ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
682 use tokio_stream::StreamExt;
683
684 // Simplified streaming - read response as text and split by lines
685 let text = response.text().await?;
686 let lines: Vec<String> = text.lines().map(|s| s.to_string()).collect();
687
688 let stream = tokio_stream::iter(lines.into_iter().map(Ok::<String, Error>));
689
690 Ok(
691 stream.map(|line_result: Result<String>| -> Result<StreamChunk> {
692 let line = line_result?;
693
694 // Parse Server-Sent Events format
695 if let Some(data_str) = line.strip_prefix("data: ") {
696 // Remove "data: " prefix
697 if data_str == "[DONE]" {
698 return Ok(StreamChunk {
699 data: Value::Null,
700 sequence: None,
701 is_final: true,
702 });
703 }
704
705 let data: Value = serde_json::from_str(data_str).map_err(|e| {
706 Error::functions(format!("Failed to parse stream data: {}", e))
707 })?;
708
709 Ok(StreamChunk {
710 data,
711 sequence: None,
712 is_final: false,
713 })
714 } else if !line.is_empty() && !line.starts_with(':') {
715 // Skip non-data lines (event:, id:, etc.) and empty lines
716 Ok(StreamChunk {
717 data: Value::Null,
718 sequence: None,
719 is_final: false,
720 })
721 } else {
722 Ok(StreamChunk {
723 data: Value::Null,
724 sequence: None,
725 is_final: false,
726 })
727 }
728 }),
729 )
730 }
731
732 fn parse_function_error(&self, error_json: &Value) -> String {
733 // Enhanced error parsing for different error formats
734 if let Some(message) = error_json.get("error") {
735 if let Some(details) = message.get("message") {
736 return details.as_str().unwrap_or("Unknown error").to_string();
737 }
738 return message.as_str().unwrap_or("Unknown error").to_string();
739 }
740
741 if let Some(message) = error_json.get("message") {
742 return message.as_str().unwrap_or("Unknown error").to_string();
743 }
744
745 if let Some(details) = error_json.get("details") {
746 return details.as_str().unwrap_or("Unknown error").to_string();
747 }
748
749 "Function execution failed".to_string()
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use crate::types::{AuthConfig, DatabaseConfig, HttpConfig, StorageConfig, SupabaseConfig};
757
758 fn create_test_functions() -> Functions {
759 let config = Arc::new(SupabaseConfig {
760 url: "http://localhost:54321".to_string(),
761 key: "test-key".to_string(),
762 service_role_key: None,
763 http_config: HttpConfig::default(),
764 auth_config: AuthConfig::default(),
765 database_config: DatabaseConfig::default(),
766 storage_config: StorageConfig::default(),
767 });
768
769 let http_client = Arc::new(HttpClient::new());
770 Functions::new(config, http_client).unwrap()
771 }
772
773 #[test]
774 fn test_functions_creation() {
775 let functions = create_test_functions();
776 assert_eq!(
777 functions.functions_url(),
778 "http://localhost:54321/functions/v1"
779 );
780 }
781
782 #[test]
783 fn test_functions_url_generation() {
784 let functions = create_test_functions();
785 assert_eq!(
786 functions.functions_url(),
787 "http://localhost:54321/functions/v1"
788 );
789 }
790}