use crate::{
error::{Error, Result},
types::SupabaseConfig,
};
use reqwest::Client as HttpClient;
#[cfg(not(target_arch = "wasm32"))]
use reqwest::Response;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc, time::Duration};
#[cfg(not(target_arch = "wasm32"))]
use tokio_stream::Stream;
use tracing::{debug, info, warn};
#[cfg(not(target_arch = "wasm32"))]
async fn async_sleep(duration: Duration) {
tokio::time::sleep(duration).await;
}
#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
async fn async_sleep(duration: Duration) {
use gloo_timers::future::sleep as gloo_sleep;
gloo_sleep(duration).await;
}
#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
async fn async_sleep(_duration: Duration) {
}
#[derive(Debug, Clone)]
pub struct Functions {
http_client: Arc<HttpClient>,
config: Arc<SupabaseConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionMetadata {
pub name: String,
pub description: Option<String>,
pub version: Option<String>,
pub runtime: Option<String>,
pub memory_limit: Option<u32>,
pub timeout: Option<u32>,
pub env_vars: HashMap<String, String>,
pub status: FunctionStatus,
pub created_at: Option<String>,
pub updated_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FunctionStatus {
Active,
Inactive,
Deploying,
Failed,
}
#[derive(Debug, Clone, Default)]
pub struct InvokeOptions {
pub headers: Option<HashMap<String, String>>,
pub timeout: Option<Duration>,
pub retry: Option<RetryConfig>,
pub streaming: bool,
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub delay: Duration,
pub backoff_multiplier: f64,
pub max_delay: Duration,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
delay: Duration::from_millis(1000),
backoff_multiplier: 2.0,
max_delay: Duration::from_secs(30),
}
}
}
#[derive(Debug, Clone)]
pub struct StreamChunk {
pub data: Value,
pub sequence: Option<u64>,
pub is_final: bool,
}
#[derive(Debug, Clone)]
pub struct LocalConfig {
pub local_url: String,
pub functions_dir: Option<String>,
pub port: Option<u16>,
}
impl Functions {
pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
debug!("Initializing Functions module");
Ok(Self {
http_client,
config,
})
}
pub async fn invoke(&self, function_name: &str, body: Option<Value>) -> Result<Value> {
self.invoke_with_options(function_name, body, None).await
}
pub async fn invoke_with_options(
&self,
function_name: &str,
body: Option<Value>,
headers: Option<HashMap<String, String>>,
) -> Result<Value> {
debug!("Invoking Edge Function: {}", function_name);
let url = format!("{}/functions/v1/{}", self.config.url, function_name);
let mut request = self
.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.key))
.header("Content-Type", "application/json");
if let Some(custom_headers) = headers {
for (key, value) in custom_headers {
request = request.header(key, value);
}
}
if let Some(body) = body {
request = request.json(&body);
}
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let error_msg = match response.text().await {
Ok(text) => {
if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
if let Some(message) = error_json.get("message") {
message.as_str().unwrap_or(&text).to_string()
} else {
text
}
} else {
text
}
}
Err(_) => format!("Function invocation failed with status: {}", status),
};
return Err(Error::functions(error_msg));
}
let result: Value = response.json().await?;
info!("Edge Function {} invoked successfully", function_name);
Ok(result)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn invoke_stream(
&self,
function_name: &str,
body: Option<Value>,
) -> Result<impl Stream<Item = Result<StreamChunk>>> {
debug!(
"Starting streaming invocation of function: {}",
function_name
);
let url = format!("{}/functions/v1/{}", self.config.url, function_name);
let mut request = self
.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.key))
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache");
if let Some(body) = body {
request = request.json(&body);
}
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let error_msg = response.text().await.unwrap_or_else(|_| {
format!(
"Streaming function invocation failed with status: {}",
status
)
});
return Err(Error::functions(error_msg));
}
self.process_stream(response).await
}
pub async fn get_function_metadata(&self, function_name: &str) -> Result<FunctionMetadata> {
debug!("Fetching metadata for function: {}", function_name);
let url = format!(
"{}/functions/v1/{}/metadata",
self.config.url, function_name
);
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.key))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_msg = response.text().await.unwrap_or_else(|_| {
format!("Failed to fetch function metadata, status: {}", status)
});
return Err(Error::functions(error_msg));
}
let metadata: FunctionMetadata = response.json().await?;
info!("Retrieved metadata for function: {}", function_name);
Ok(metadata)
}
pub async fn list_functions(&self) -> Result<Vec<FunctionMetadata>> {
debug!("Listing all available functions");
let url = format!("{}/functions/v1", self.config.url);
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.key))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_msg = response
.text()
.await
.unwrap_or_else(|_| format!("Failed to list functions, status: {}", status));
return Err(Error::functions(error_msg));
}
let functions: Vec<FunctionMetadata> = response.json().await?;
info!("Retrieved {} functions", functions.len());
Ok(functions)
}
pub async fn invoke_with_advanced_options(
&self,
function_name: &str,
body: Option<Value>,
options: InvokeOptions,
) -> Result<Value> {
debug!("Invoking function with advanced options: {}", function_name);
let mut attempt = 0;
let max_attempts = options.retry.as_ref().map(|r| r.max_attempts).unwrap_or(1);
loop {
attempt += 1;
match self
.invoke_function_once(function_name, body.clone(), &options)
.await
{
Ok(result) => return Ok(result),
Err(e) if attempt < max_attempts => {
warn!("Function invocation attempt {} failed: {}", attempt, e);
if let Some(retry_config) = &options.retry {
let base_delay_ms = retry_config.delay.as_millis() as u64;
let backoff_factor =
retry_config.backoff_multiplier.powi(attempt as i32 - 1);
let calculated_delay_ms = (base_delay_ms as f64 * backoff_factor) as u64;
let max_delay_ms = retry_config.max_delay.as_millis() as u64;
let delay_ms = std::cmp::min(calculated_delay_ms, max_delay_ms);
async_sleep(Duration::from_millis(delay_ms)).await;
}
}
Err(e) => return Err(e),
}
}
}
pub async fn test_local(
&self,
function_name: &str,
body: Option<Value>,
local_config: LocalConfig,
) -> Result<Value> {
debug!("Testing function locally: {}", function_name);
let url = format!("{}/functions/v1/{}", local_config.local_url, function_name);
let mut request = self
.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.key))
.header("Content-Type", "application/json")
.header("X-Local-Test", "true");
if let Some(body) = body {
request = request.json(&body);
}
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let error_msg = response
.text()
.await
.unwrap_or_else(|_| format!("Local function test failed with status: {}", status));
return Err(Error::functions(error_msg));
}
let result: Value = response.json().await?;
info!("Local function test completed: {}", function_name);
Ok(result)
}
pub fn functions_url(&self) -> String {
format!("{}/functions/v1", self.config.url)
}
async fn invoke_function_once(
&self,
function_name: &str,
body: Option<Value>,
options: &InvokeOptions,
) -> Result<Value> {
let url = format!("{}/functions/v1/{}", self.config.url, function_name);
let mut request = self
.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.key))
.header("Content-Type", "application/json");
if let Some(custom_headers) = &options.headers {
for (key, value) in custom_headers {
request = request.header(key, value);
}
}
if let Some(timeout) = options.timeout {
request = request.timeout(timeout);
}
if let Some(body) = body {
request = request.json(&body);
}
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let error_msg = match response.text().await {
Ok(text) => {
if let Ok(error_json) = serde_json::from_str::<Value>(&text) {
self.parse_function_error(&error_json)
} else {
text
}
}
Err(_) => format!("Function invocation failed with status: {}", status),
};
return Err(Error::functions(error_msg));
}
let result: Value = response.json().await?;
Ok(result)
}
#[cfg(not(target_arch = "wasm32"))]
async fn process_stream(
&self,
response: Response,
) -> Result<impl Stream<Item = Result<StreamChunk>>> {
use tokio_stream::StreamExt;
let text = response.text().await?;
let lines: Vec<String> = text.lines().map(|s| s.to_string()).collect();
let stream = tokio_stream::iter(lines.into_iter().map(Ok::<String, Error>));
Ok(
stream.map(|line_result: Result<String>| -> Result<StreamChunk> {
let line = line_result?;
if let Some(data_str) = line.strip_prefix("data: ") {
if data_str == "[DONE]" {
return Ok(StreamChunk {
data: Value::Null,
sequence: None,
is_final: true,
});
}
let data: Value = serde_json::from_str(data_str).map_err(|e| {
Error::functions(format!("Failed to parse stream data: {}", e))
})?;
Ok(StreamChunk {
data,
sequence: None,
is_final: false,
})
} else if !line.is_empty() && !line.starts_with(':') {
Ok(StreamChunk {
data: Value::Null,
sequence: None,
is_final: false,
})
} else {
Ok(StreamChunk {
data: Value::Null,
sequence: None,
is_final: false,
})
}
}),
)
}
fn parse_function_error(&self, error_json: &Value) -> String {
if let Some(message) = error_json.get("error") {
if let Some(details) = message.get("message") {
return details.as_str().unwrap_or("Unknown error").to_string();
}
return message.as_str().unwrap_or("Unknown error").to_string();
}
if let Some(message) = error_json.get("message") {
return message.as_str().unwrap_or("Unknown error").to_string();
}
if let Some(details) = error_json.get("details") {
return details.as_str().unwrap_or("Unknown error").to_string();
}
"Function execution failed".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{AuthConfig, DatabaseConfig, HttpConfig, StorageConfig, SupabaseConfig};
fn create_test_functions() -> Functions {
let config = Arc::new(SupabaseConfig {
url: "http://localhost:54321".to_string(),
key: "test-key".to_string(),
service_role_key: None,
http_config: HttpConfig::default(),
auth_config: AuthConfig::default(),
database_config: DatabaseConfig::default(),
storage_config: StorageConfig::default(),
});
let http_client = Arc::new(HttpClient::new());
Functions::new(config, http_client).unwrap()
}
#[test]
fn test_functions_creation() {
let functions = create_test_functions();
assert_eq!(
functions.functions_url(),
"http://localhost:54321/functions/v1"
);
}
#[test]
fn test_functions_url_generation() {
let functions = create_test_functions();
assert_eq!(
functions.functions_url(),
"http://localhost:54321/functions/v1"
);
}
}