supabase/functions.rs
1//! Edge Functions module for Supabase
2//!
3//! This module provides functionality to invoke Supabase Edge Functions.
4//! Edge Functions are server-side TypeScript functions that run on the edge,
5//! close to your users for reduced latency.
6//!
7//! ## Features
8//!
9//! - **Standard Invocation**: Traditional request/response function calls
10//! - **Streaming Responses**: Server-sent events and streaming data
11//! - **Function Metadata**: Introspection and function discovery
12//! - **Local Development**: Testing utilities for local functions
13//! - **Enhanced Error Handling**: Detailed error context and retry logic
14
15use crate::{
16 error::{Error, Result},
17 types::SupabaseConfig,
18};
19use reqwest::{Client as HttpClient, Response};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22use std::{collections::HashMap, sync::Arc, time::Duration};
23#[cfg(not(target_arch = "wasm32"))]
24use tokio_stream::Stream;
25use tracing::{debug, info, warn};
26
27/// Edge Functions client for invoking serverless functions
28///
29/// # Examples
30///
31/// Basic function invocation:
32///
33/// ```rust,no_run
34/// use supabase::Client;
35/// use serde_json::json;
36///
37/// # async fn example() -> supabase::Result<()> {
38/// let client = Client::new("your-project-url", "your-anon-key")?;
39///
40/// // Invoke a function with parameters
41/// let result = client.functions()
42/// .invoke("hello-world", Some(json!({"name": "World"})))
43/// .await?;
44///
45/// println!("Function result: {}", result);
46/// # Ok(())
47/// # }
48/// ```
49///
50/// Streaming function responses:
51///
52/// ```rust,no_run
53/// use supabase::Client;
54/// use serde_json::json;
55/// use tokio_stream::StreamExt;
56///
57/// # async fn example() -> supabase::Result<()> {
58/// let client = Client::new("your-project-url", "your-anon-key")?;
59///
60/// // Stream function responses
61/// let mut stream = client.functions()
62/// .invoke_stream("data-processor", Some(json!({"batch_size": 100})))
63/// .await?;
64///
65/// while let Some(chunk) = stream.next().await {
66/// match chunk {
67/// Ok(data) => println!("Received: {:?}", data),
68/// Err(e) => println!("Stream error: {}", e),
69/// }
70/// }
71/// # Ok(())
72/// # }
73/// ```
74#[derive(Debug, Clone)]
75pub struct Functions {
76 http_client: Arc<HttpClient>,
77 config: Arc<SupabaseConfig>,
78}
79
80/// Function metadata and introspection information
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct FunctionMetadata {
83 /// Function name
84 pub name: String,
85 /// Function description
86 pub description: Option<String>,
87 /// Function version
88 pub version: Option<String>,
89 /// Runtime environment
90 pub runtime: Option<String>,
91 /// Memory limit in MB
92 pub memory_limit: Option<u32>,
93 /// Timeout in seconds
94 pub timeout: Option<u32>,
95 /// Environment variables (non-sensitive)
96 pub env_vars: HashMap<String, String>,
97 /// Function status
98 pub status: FunctionStatus,
99 /// Creation timestamp
100 pub created_at: Option<String>,
101 /// Last modified timestamp
102 pub updated_at: Option<String>,
103}
104
105/// Function execution status
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum FunctionStatus {
109 /// Function is active and can be invoked
110 Active,
111 /// Function is paused/disabled
112 Inactive,
113 /// Function is deploying
114 Deploying,
115 /// Function deployment failed
116 Failed,
117}
118
119/// Configuration for function invocation
120#[derive(Debug, Clone, Default)]
121pub struct InvokeOptions {
122 /// Additional headers to send
123 pub headers: Option<HashMap<String, String>>,
124 /// Function timeout override
125 pub timeout: Option<Duration>,
126 /// Retry configuration
127 pub retry: Option<RetryConfig>,
128 /// Enable streaming response
129 pub streaming: bool,
130}
131
132/// Retry configuration for function invocation
133#[derive(Debug, Clone)]
134pub struct RetryConfig {
135 /// Maximum number of retries
136 pub max_attempts: u32,
137 /// Delay between retries
138 pub delay: Duration,
139 /// Exponential backoff multiplier
140 pub backoff_multiplier: f64,
141 /// Maximum delay between retries
142 pub max_delay: Duration,
143}
144
145impl Default for RetryConfig {
146 fn default() -> Self {
147 Self {
148 max_attempts: 3,
149 delay: Duration::from_millis(1000),
150 backoff_multiplier: 2.0,
151 max_delay: Duration::from_secs(30),
152 }
153 }
154}
155
156/// Streaming chunk from function response
157#[derive(Debug, Clone)]
158pub struct StreamChunk {
159 /// Chunk data
160 pub data: Value,
161 /// Chunk sequence number
162 pub sequence: Option<u64>,
163 /// Whether this is the last chunk
164 pub is_final: bool,
165}
166
167/// Local development configuration
168#[derive(Debug, Clone)]
169pub struct LocalConfig {
170 /// Local functions server URL
171 pub local_url: String,
172 /// Local functions directory
173 pub functions_dir: Option<String>,
174 /// Development server port
175 pub port: Option<u16>,
176}
177
178impl Functions {
179 /// Create a new Functions instance
180 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
181 debug!("Initializing Functions module");
182
183 Ok(Self {
184 http_client,
185 config,
186 })
187 }
188
189 /// Invoke an Edge Function
190 ///
191 /// # Parameters
192 ///
193 /// * `function_name` - Name of the function to invoke
194 /// * `body` - Optional JSON body to send to the function
195 ///
196 /// # Examples
197 ///
198 /// ```rust,no_run
199 /// use serde_json::json;
200 ///
201 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
202 /// // Simple function call
203 /// let result = functions.invoke("hello", None).await?;
204 ///
205 /// // Function with parameters
206 /// let result = functions.invoke("process-data", Some(json!({
207 /// "user_id": 123,
208 /// "action": "update_profile"
209 /// }))).await?;
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub async fn invoke(&self, function_name: &str, body: Option<Value>) -> Result<Value> {
214 self.invoke_with_options(function_name, body, None).await
215 }
216
217 /// Invoke an Edge Function with custom options
218 ///
219 /// # Parameters
220 ///
221 /// * `function_name` - Name of the function to invoke
222 /// * `body` - Optional JSON body to send to the function
223 /// * `headers` - Optional additional headers to send
224 ///
225 /// # Examples
226 ///
227 /// ```rust,no_run
228 /// use supabase::Client;
229 /// use serde_json::json;
230 /// use std::collections::HashMap;
231 ///
232 /// # async fn example() -> supabase::Result<()> {
233 /// let client = Client::new("your-project-url", "your-anon-key")?;
234 ///
235 /// let mut headers = HashMap::new();
236 /// headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
237 ///
238 /// let result = client.functions()
239 /// .invoke_with_options("my-function", Some(json!({"data": "value"})), Some(headers))
240 /// .await?;
241 /// # Ok(())
242 /// # }
243 /// ```
244 pub async fn invoke_with_options(
245 &self,
246 function_name: &str,
247 body: Option<Value>,
248 headers: Option<HashMap<String, String>>,
249 ) -> Result<Value> {
250 debug!("Invoking Edge Function: {}", function_name);
251
252 let url = format!("{}/functions/v1/{}", self.config.url, function_name);
253
254 let mut request = self
255 .http_client
256 .post(&url)
257 .header("Authorization", format!("Bearer {}", self.config.key))
258 .header("Content-Type", "application/json");
259
260 // Add custom headers if provided
261 if let Some(custom_headers) = headers {
262 for (key, value) in custom_headers {
263 request = request.header(key, value);
264 }
265 }
266
267 // Add body if provided
268 if let Some(body) = body {
269 request = request.json(&body);
270 }
271
272 let response = request.send().await?;
273
274 if !response.status().is_success() {
275 let status = response.status();
276 let error_msg = match response.text().await {
277 Ok(text) => {
278 // Try to parse error message from Supabase
279 if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
280 if let Some(message) = error_json.get("message") {
281 message.as_str().unwrap_or(&text).to_string()
282 } else {
283 text
284 }
285 } else {
286 text
287 }
288 }
289 Err(_) => format!("Function invocation failed with status: {}", status),
290 };
291 return Err(Error::functions(error_msg));
292 }
293
294 let result: Value = response.json().await?;
295 info!("Edge Function {} invoked successfully", function_name);
296
297 Ok(result)
298 }
299
300 /// Invoke an Edge Function with streaming response (native only)
301 ///
302 /// This method enables server-sent events or streaming responses from functions.
303 /// Only available on native platforms (not WASM).
304 ///
305 /// # Parameters
306 ///
307 /// * `function_name` - Name of the function to invoke
308 /// * `body` - Optional JSON body to send to the function
309 ///
310 /// # Examples
311 ///
312 /// ```rust,no_run
313 /// use serde_json::json;
314 /// use tokio_stream::StreamExt;
315 ///
316 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
317 /// let mut stream = functions.invoke_stream("streaming-function", Some(json!({
318 /// "mode": "realtime",
319 /// "duration": 60
320 /// }))).await?;
321 ///
322 /// while let Some(chunk) = stream.next().await {
323 /// match chunk {
324 /// Ok(data) => println!("Received chunk: {}", data.data),
325 /// Err(e) => println!("Stream error: {}", e),
326 /// }
327 /// }
328 /// # Ok(())
329 /// # }
330 /// ```
331 #[cfg(not(target_arch = "wasm32"))]
332 pub async fn invoke_stream(
333 &self,
334 function_name: &str,
335 body: Option<Value>,
336 ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
337 debug!(
338 "Starting streaming invocation of function: {}",
339 function_name
340 );
341
342 let url = format!("{}/functions/v1/{}", self.config.url, function_name);
343
344 let mut request = self
345 .http_client
346 .post(&url)
347 .header("Authorization", format!("Bearer {}", self.config.key))
348 .header("Content-Type", "application/json")
349 .header("Accept", "text/event-stream")
350 .header("Cache-Control", "no-cache");
351
352 // Add body if provided
353 if let Some(body) = body {
354 request = request.json(&body);
355 }
356
357 let response = request.send().await?;
358
359 if !response.status().is_success() {
360 let status = response.status();
361 let error_msg = response.text().await.unwrap_or_else(|_| {
362 format!(
363 "Streaming function invocation failed with status: {}",
364 status
365 )
366 });
367 return Err(Error::functions(error_msg));
368 }
369
370 self.process_stream(response).await
371 }
372
373 /// Get metadata for a specific function
374 ///
375 /// # Parameters
376 ///
377 /// * `function_name` - Name of the function to introspect
378 ///
379 /// # Examples
380 ///
381 /// ```rust,no_run
382 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
383 /// let metadata = functions.get_function_metadata("my-function").await?;
384 /// println!("Function: {}", metadata.name);
385 /// println!("Status: {:?}", metadata.status);
386 /// println!("Memory: {:?} MB", metadata.memory_limit);
387 /// # Ok(())
388 /// # }
389 /// ```
390 pub async fn get_function_metadata(&self, function_name: &str) -> Result<FunctionMetadata> {
391 debug!("Fetching metadata for function: {}", function_name);
392
393 let url = format!(
394 "{}/functions/v1/{}/metadata",
395 self.config.url, function_name
396 );
397
398 let response = self
399 .http_client
400 .get(&url)
401 .header("Authorization", format!("Bearer {}", self.config.key))
402 .send()
403 .await?;
404
405 if !response.status().is_success() {
406 let status = response.status();
407 let error_msg = response.text().await.unwrap_or_else(|_| {
408 format!("Failed to fetch function metadata, status: {}", status)
409 });
410 return Err(Error::functions(error_msg));
411 }
412
413 let metadata: FunctionMetadata = response.json().await?;
414 info!("Retrieved metadata for function: {}", function_name);
415
416 Ok(metadata)
417 }
418
419 /// List all available functions with their metadata
420 ///
421 /// # Examples
422 ///
423 /// ```rust,no_run
424 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
425 /// let functions_list = functions.list_functions().await?;
426 /// for func in functions_list {
427 /// println!("Function: {} - Status: {:?}", func.name, func.status);
428 /// }
429 /// # Ok(())
430 /// # }
431 /// ```
432 pub async fn list_functions(&self) -> Result<Vec<FunctionMetadata>> {
433 debug!("Listing all available functions");
434
435 let url = format!("{}/functions/v1", self.config.url);
436
437 let response = self
438 .http_client
439 .get(&url)
440 .header("Authorization", format!("Bearer {}", self.config.key))
441 .send()
442 .await?;
443
444 if !response.status().is_success() {
445 let status = response.status();
446 let error_msg = response
447 .text()
448 .await
449 .unwrap_or_else(|_| format!("Failed to list functions, status: {}", status));
450 return Err(Error::functions(error_msg));
451 }
452
453 let functions: Vec<FunctionMetadata> = response.json().await?;
454 info!("Retrieved {} functions", functions.len());
455
456 Ok(functions)
457 }
458
459 /// Invoke a function with advanced options
460 ///
461 /// # Parameters
462 ///
463 /// * `function_name` - Name of the function to invoke
464 /// * `body` - Optional JSON body to send to the function
465 /// * `options` - Invocation options (headers, timeout, retry, etc.)
466 ///
467 /// # Examples
468 ///
469 /// ```rust,no_run
470 /// use supabase::functions::{InvokeOptions, RetryConfig};
471 /// use serde_json::json;
472 /// use std::{collections::HashMap, time::Duration};
473 ///
474 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
475 /// let mut headers = HashMap::new();
476 /// headers.insert("X-Priority".to_string(), "high".to_string());
477 ///
478 /// let options = InvokeOptions {
479 /// headers: Some(headers),
480 /// timeout: Some(Duration::from_secs(30)),
481 /// retry: Some(RetryConfig::default()),
482 /// streaming: false,
483 /// };
484 ///
485 /// let result = functions.invoke_with_advanced_options(
486 /// "critical-function",
487 /// Some(json!({"data": "important"})),
488 /// options
489 /// ).await?;
490 /// # Ok(())
491 /// # }
492 /// ```
493 pub async fn invoke_with_advanced_options(
494 &self,
495 function_name: &str,
496 body: Option<Value>,
497 options: InvokeOptions,
498 ) -> Result<Value> {
499 debug!("Invoking function with advanced options: {}", function_name);
500
501 let mut attempt = 0;
502 let max_attempts = options.retry.as_ref().map(|r| r.max_attempts).unwrap_or(1);
503
504 loop {
505 attempt += 1;
506
507 match self
508 .invoke_function_once(function_name, body.clone(), &options)
509 .await
510 {
511 Ok(result) => return Ok(result),
512 Err(e) if attempt < max_attempts => {
513 warn!("Function invocation attempt {} failed: {}", attempt, e);
514
515 if let Some(retry_config) = &options.retry {
516 let base_delay_ms = retry_config.delay.as_millis() as u64;
517 let backoff_factor =
518 retry_config.backoff_multiplier.powi(attempt as i32 - 1);
519 let calculated_delay_ms = (base_delay_ms as f64 * backoff_factor) as u64;
520 let max_delay_ms = retry_config.max_delay.as_millis() as u64;
521
522 let delay_ms = std::cmp::min(calculated_delay_ms, max_delay_ms);
523 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
524 }
525 }
526 Err(e) => return Err(e),
527 }
528 }
529 }
530
531 /// Test a function locally (for development)
532 ///
533 /// # Parameters
534 ///
535 /// * `function_name` - Name of the function to test
536 /// * `body` - Optional JSON body to send to the function
537 /// * `local_config` - Local development configuration
538 ///
539 /// # Examples
540 ///
541 /// ```rust,no_run
542 /// use supabase::functions::LocalConfig;
543 /// use serde_json::json;
544 ///
545 /// # async fn example(functions: &supabase::Functions) -> supabase::Result<()> {
546 /// let local_config = LocalConfig {
547 /// local_url: "http://localhost:54321".to_string(),
548 /// functions_dir: Some("./functions".to_string()),
549 /// port: Some(54321),
550 /// };
551 ///
552 /// let result = functions.test_local(
553 /// "my-function",
554 /// Some(json!({"test": true})),
555 /// local_config
556 /// ).await?;
557 /// # Ok(())
558 /// # }
559 /// ```
560 pub async fn test_local(
561 &self,
562 function_name: &str,
563 body: Option<Value>,
564 local_config: LocalConfig,
565 ) -> Result<Value> {
566 debug!("Testing function locally: {}", function_name);
567
568 let url = format!("{}/functions/v1/{}", local_config.local_url, function_name);
569
570 let mut request = self
571 .http_client
572 .post(&url)
573 .header("Authorization", format!("Bearer {}", self.config.key))
574 .header("Content-Type", "application/json")
575 .header("X-Local-Test", "true");
576
577 if let Some(body) = body {
578 request = request.json(&body);
579 }
580
581 let response = request.send().await?;
582
583 if !response.status().is_success() {
584 let status = response.status();
585 let error_msg = response
586 .text()
587 .await
588 .unwrap_or_else(|_| format!("Local function test failed with status: {}", status));
589 return Err(Error::functions(error_msg));
590 }
591
592 let result: Value = response.json().await?;
593 info!("Local function test completed: {}", function_name);
594
595 Ok(result)
596 }
597
598 /// Get the base Functions URL
599 pub fn functions_url(&self) -> String {
600 format!("{}/functions/v1", self.config.url)
601 }
602
603 // Private helper methods
604
605 async fn invoke_function_once(
606 &self,
607 function_name: &str,
608 body: Option<Value>,
609 options: &InvokeOptions,
610 ) -> Result<Value> {
611 let url = format!("{}/functions/v1/{}", self.config.url, function_name);
612
613 let mut request = self
614 .http_client
615 .post(&url)
616 .header("Authorization", format!("Bearer {}", self.config.key))
617 .header("Content-Type", "application/json");
618
619 // Add custom headers
620 if let Some(custom_headers) = &options.headers {
621 for (key, value) in custom_headers {
622 request = request.header(key, value);
623 }
624 }
625
626 // Set timeout
627 if let Some(timeout) = options.timeout {
628 request = request.timeout(timeout);
629 }
630
631 // Add body if provided
632 if let Some(body) = body {
633 request = request.json(&body);
634 }
635
636 let response = request.send().await?;
637
638 if !response.status().is_success() {
639 let status = response.status();
640 let error_msg = match response.text().await {
641 Ok(text) => {
642 // Enhanced error parsing
643 if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
644 self.parse_function_error(&error_json)
645 } else {
646 text
647 }
648 }
649 Err(_) => format!("Function invocation failed with status: {}", status),
650 };
651 return Err(Error::functions(error_msg));
652 }
653
654 let result: Value = response.json().await?;
655 Ok(result)
656 }
657
658 #[cfg(not(target_arch = "wasm32"))]
659 async fn process_stream(
660 &self,
661 response: Response,
662 ) -> Result<impl Stream<Item = Result<StreamChunk>>> {
663 use tokio_stream::StreamExt;
664
665 // Simplified streaming - read response as text and split by lines
666 let text = response.text().await?;
667 let lines: Vec<String> = text.lines().map(|s| s.to_string()).collect();
668
669 let stream = tokio_stream::iter(lines.into_iter().map(Ok::<String, Error>));
670
671 Ok(
672 stream.map(|line_result: Result<String>| -> Result<StreamChunk> {
673 let line = line_result?;
674
675 // Parse Server-Sent Events format
676 if let Some(data_str) = line.strip_prefix("data: ") {
677 // Remove "data: " prefix
678 if data_str == "[DONE]" {
679 return Ok(StreamChunk {
680 data: Value::Null,
681 sequence: None,
682 is_final: true,
683 });
684 }
685
686 let data: Value = serde_json::from_str(data_str).map_err(|e| {
687 Error::functions(format!("Failed to parse stream data: {}", e))
688 })?;
689
690 Ok(StreamChunk {
691 data,
692 sequence: None,
693 is_final: false,
694 })
695 } else if !line.is_empty() && !line.starts_with(':') {
696 // Skip non-data lines (event:, id:, etc.) and empty lines
697 Ok(StreamChunk {
698 data: Value::Null,
699 sequence: None,
700 is_final: false,
701 })
702 } else {
703 Ok(StreamChunk {
704 data: Value::Null,
705 sequence: None,
706 is_final: false,
707 })
708 }
709 }),
710 )
711 }
712
713 fn parse_function_error(&self, error_json: &Value) -> String {
714 // Enhanced error parsing for different error formats
715 if let Some(message) = error_json.get("error") {
716 if let Some(details) = message.get("message") {
717 return details.as_str().unwrap_or("Unknown error").to_string();
718 }
719 return message.as_str().unwrap_or("Unknown error").to_string();
720 }
721
722 if let Some(message) = error_json.get("message") {
723 return message.as_str().unwrap_or("Unknown error").to_string();
724 }
725
726 if let Some(details) = error_json.get("details") {
727 return details.as_str().unwrap_or("Unknown error").to_string();
728 }
729
730 "Function execution failed".to_string()
731 }
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use crate::types::{AuthConfig, DatabaseConfig, HttpConfig, StorageConfig, SupabaseConfig};
738
739 fn create_test_functions() -> Functions {
740 let config = Arc::new(SupabaseConfig {
741 url: "http://localhost:54321".to_string(),
742 key: "test-key".to_string(),
743 service_role_key: None,
744 http_config: HttpConfig::default(),
745 auth_config: AuthConfig::default(),
746 database_config: DatabaseConfig::default(),
747 storage_config: StorageConfig::default(),
748 });
749
750 let http_client = Arc::new(HttpClient::new());
751 Functions::new(config, http_client).unwrap()
752 }
753
754 #[test]
755 fn test_functions_creation() {
756 let functions = create_test_functions();
757 assert_eq!(
758 functions.functions_url(),
759 "http://localhost:54321/functions/v1"
760 );
761 }
762
763 #[test]
764 fn test_functions_url_generation() {
765 let functions = create_test_functions();
766 assert_eq!(
767 functions.functions_url(),
768 "http://localhost:54321/functions/v1"
769 );
770 }
771}