use regex::Regex;
use reqwest::Client;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::LazyLock;
use super::CollectionQuery;
use crate::Result;
static SAFE_IDENTIFIER_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_.]{0,127}$").unwrap());
fn validate_identifier(name: &str) -> Result<()> {
if SAFE_IDENTIFIER_RE.is_match(name) {
Ok(())
} else {
Err(crate::Error::Data(format!(
"Invalid identifier: {:?}",
name
)))
}
}
#[derive(Clone)]
pub struct SupabaseDatabase {
client: Client,
project_url: String,
api_key: String,
}
impl SupabaseDatabase {
pub fn new(project_url: &str, api_key: &str) -> Self {
Self {
client: Client::new(),
project_url: project_url.trim_end_matches('/').to_string(),
api_key: api_key.to_string(),
}
}
fn rest_url(&self, table: &str) -> String {
format!("{}/rest/v1/{}", self.project_url, table)
}
async fn get_rows(&self, table: &str, query_params: &str) -> Result<Vec<Value>> {
let url = if query_params.is_empty() {
self.rest_url(table)
} else {
format!("{}?{}", self.rest_url(table), query_params)
};
let resp = self
.client
.get(&url)
.header("apikey", &self.api_key)
.header("Authorization", format!("Bearer {}", self.api_key))
.send()
.await
.map_err(|e| crate::Error::Data(format!("Supabase GET failed: {}", e)))?;
let status = resp.status();
let text = resp
.text()
.await
.map_err(|e| crate::Error::Data(format!("Supabase response read error: {}", e)))?;
if !status.is_success() {
return Err(crate::Error::Data(format!(
"Supabase GET error ({}): {}",
status, text
)));
}
serde_json::from_str(&text)
.map_err(|e| crate::Error::Data(format!("Supabase JSON parse error: {}", e)))
}
async fn insert_row(&self, table: &str, body: &Value) -> Result<Vec<Value>> {
let url = self.rest_url(table);
let resp = self
.client
.post(&url)
.header("apikey", &self.api_key)
.header("Authorization", format!("Bearer {}", self.api_key))
.header("Content-Type", "application/json")
.header("Prefer", "return=representation")
.json(body)
.send()
.await
.map_err(|e| crate::Error::Data(format!("Supabase POST failed: {}", e)))?;
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
if !status.is_success() {
return Err(crate::Error::Data(format!(
"Supabase POST error ({}): {}",
status, text
)));
}
serde_json::from_str(&text)
.map_err(|e| crate::Error::Data(format!("Supabase POST parse error: {}", e)))
}
async fn patch_rows(
&self,
table: &str,
query_params: &str,
body: &Value,
) -> Result<Vec<Value>> {
let url = format!("{}?{}", self.rest_url(table), query_params);
let resp = self
.client
.patch(&url)
.header("apikey", &self.api_key)
.header("Authorization", format!("Bearer {}", self.api_key))
.header("Content-Type", "application/json")
.header("Prefer", "return=representation")
.json(body)
.send()
.await
.map_err(|e| crate::Error::Data(format!("Supabase PATCH failed: {}", e)))?;
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
if !status.is_success() {
return Err(crate::Error::Data(format!(
"Supabase PATCH error ({}): {}",
status, text
)));
}
serde_json::from_str(&text)
.map_err(|e| crate::Error::Data(format!("Supabase PATCH parse error: {}", e)))
}
async fn delete_rows(&self, table: &str, query_params: &str) -> Result<Vec<Value>> {
let url = format!("{}?{}", self.rest_url(table), query_params);
let resp = self
.client
.delete(&url)
.header("apikey", &self.api_key)
.header("Authorization", format!("Bearer {}", self.api_key))
.header("Prefer", "return=representation")
.send()
.await
.map_err(|e| crate::Error::Data(format!("Supabase DELETE failed: {}", e)))?;
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
if !status.is_success() {
return Err(crate::Error::Data(format!(
"Supabase DELETE error ({}): {}",
status, text
)));
}
serde_json::from_str(&text)
.map_err(|e| crate::Error::Data(format!("Supabase DELETE parse error: {}", e)))
}
pub async fn get_collection(&self, name: &str) -> Result<Vec<Value>> {
validate_identifier(name)?;
let rows = self.get_rows(name, "select=id,data").await?;
Ok(rows
.into_iter()
.map(|row| Self::row_to_item(&row))
.collect())
}
pub async fn query_collection(
&self,
name: &str,
query: &CollectionQuery,
) -> Result<Vec<Value>> {
validate_identifier(name)?;
let mut params = vec!["select=id,data".to_string()];
if let Some(ref filter_expr) = query.filter {
if let Some(filter_params) = build_postgrest_filter(filter_expr) {
params.extend(filter_params);
}
}
for forced in &query.forced_filters {
if let Some(filter_params) = build_postgrest_filter(forced) {
params.extend(filter_params);
}
}
if let Some(ref search) = query.search {
if !search.is_empty() {
params.push(format!("data=ilike.*{}*", urlencoding::encode(search)));
}
}
if let Some(ref sort) = query.sort {
let (field, desc) = if let Some((f, d)) = sort.rsplit_once(':') {
(f, d.eq_ignore_ascii_case("desc"))
} else {
(sort.as_str(), false)
};
validate_identifier(field)?;
params.push(format!(
"order=data->>{}.{}",
field,
if desc { "desc" } else { "asc" }
));
}
if let Some(limit) = query.limit {
params.push(format!("limit={}", limit));
}
if let Some(offset) = query.offset {
params.push(format!("offset={}", offset));
}
let query_string = params.join("&");
let rows = self.get_rows(name, &query_string).await?;
Ok(rows
.into_iter()
.map(|row| Self::row_to_item(&row))
.collect())
}
pub async fn find_by(
&self,
collection: &str,
field: &str,
value: &Value,
) -> Result<Vec<Value>> {
validate_identifier(collection)?;
validate_identifier(field)?;
let val_str = match value {
Value::String(s) => s.clone(),
other => other.to_string(),
};
let query = format!(
"select=id,data&data->>{}=eq.{}",
field,
urlencoding::encode(&val_str)
);
let rows = self.get_rows(collection, &query).await?;
Ok(rows
.into_iter()
.map(|row| Self::row_to_item(&row))
.collect())
}
pub async fn find_one_by(
&self,
collection: &str,
field: &str,
value: &Value,
) -> Result<Option<Value>> {
validate_identifier(collection)?;
validate_identifier(field)?;
let val_str = match value {
Value::String(s) => s.clone(),
other => other.to_string(),
};
let query = format!(
"select=id,data&data->>{}=eq.{}&limit=1",
field,
urlencoding::encode(&val_str)
);
let rows = self.get_rows(collection, &query).await?;
Ok(rows.into_iter().next().map(|row| Self::row_to_item(&row)))
}
pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
validate_identifier(collection)?;
if let Value::Object(ref mut map) = item {
map.remove("id");
}
let body = json!({ "data": item });
let rows = self.insert_row(collection, &body).await?;
let row = rows
.into_iter()
.next()
.ok_or_else(|| crate::Error::Data("Supabase insert returned no rows".to_string()))?;
Ok(Self::row_to_item(&row))
}
pub async fn update(
&self,
collection: &str,
id: &Value,
updates: Value,
) -> Result<Option<Value>> {
validate_identifier(collection)?;
let id_str = match id {
Value::Number(n) => n.to_string(),
Value::String(s) => s.clone(),
other => other.to_string(),
};
let query = format!("select=id,data&id=eq.{}", id_str);
let rows = self.get_rows(collection, &query).await?;
let row = match rows.into_iter().next() {
Some(r) => r,
None => return Ok(None),
};
let mut current = Self::row_to_item(&row);
if let (Value::Object(map), Value::Object(updates_map)) = (&mut current, &updates) {
for (k, v) in updates_map {
map.insert(k.clone(), v.clone());
}
}
let mut data_for_storage = current.clone();
if let Value::Object(ref mut map) = data_for_storage {
map.remove("id");
}
let filter = format!("id=eq.{}", id_str);
self.patch_rows(collection, &filter, &json!({ "data": data_for_storage }))
.await?;
Ok(Some(current))
}
pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
validate_identifier(collection)?;
let id_str = match id {
Value::Number(n) => n.to_string(),
Value::String(s) => s.clone(),
other => other.to_string(),
};
let filter = format!("id=eq.{}", id_str);
let deleted = self.delete_rows(collection, &filter).await?;
Ok(!deleted.is_empty())
}
pub async fn set(&self, key: &str, value: Value) -> Result<()> {
let value_str = serde_json::to_string(&value)?;
let url = self.rest_url("_kv_store");
let resp = self
.client
.post(&url)
.header("apikey", &self.api_key)
.header("Authorization", format!("Bearer {}", self.api_key))
.header("Content-Type", "application/json")
.header("Prefer", "resolution=merge-duplicates")
.json(&json!({ "key": key, "value": value_str }))
.send()
.await
.map_err(|e| crate::Error::Data(format!("Supabase KV set failed: {}", e)))?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(crate::Error::Data(format!(
"Supabase KV set error: {}",
text
)));
}
Ok(())
}
pub async fn get(&self, key: &str) -> Result<Option<Value>> {
let query = format!("select=value&key=eq.{}", urlencoding::encode(key));
let rows = self.get_rows("_kv_store", &query).await?;
Ok(rows.into_iter().next().and_then(|row| {
row.get("value")
.and_then(|v| v.as_str())
.and_then(|s| serde_json::from_str(s).ok())
}))
}
pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
let existing = self.get(key).await?;
if existing.is_some() {
let filter = format!("key=eq.{}", urlencoding::encode(key));
self.delete_rows("_kv_store", &filter).await?;
}
Ok(existing)
}
pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
where
F: FnOnce(Option<&Value>) -> Value,
{
let current = self.get(key).await?;
let new_value = f(current.as_ref());
self.set(key, new_value.clone()).await?;
Ok(new_value)
}
pub async fn as_context(&self) -> Result<HashMap<String, Value>> {
let mut context = HashMap::new();
let collections = self
.get_rows("_collections", "select=name")
.await
.unwrap_or_default();
for row in collections {
if let Some(name) = row.get("name").and_then(|v| v.as_str()) {
if let Ok(items) = self.get_collection(name).await {
context.insert(name.to_string(), json!(items));
}
}
}
let kv_rows = self
.get_rows("_kv_store", "select=key,value")
.await
.unwrap_or_default();
for row in kv_rows {
if let (Some(key), Some(value_str)) = (
row.get("key").and_then(|v| v.as_str()),
row.get("value").and_then(|v| v.as_str()),
) {
if let Ok(value) = serde_json::from_str::<Value>(value_str) {
context.insert(key.to_string(), value);
}
}
}
Ok(context)
}
pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
validate_identifier(name)?;
let url = format!("{}?id=gt.0", self.rest_url(name));
self.client
.delete(&url)
.header("apikey", &self.api_key)
.header("Authorization", format!("Bearer {}", self.api_key))
.send()
.await
.map_err(|e| crate::Error::Data(format!("Supabase clear collection failed: {}", e)))?;
for item in items {
self.create(name, item).await?;
}
Ok(())
}
fn row_to_item(row: &Value) -> Value {
let id = row.get("id").cloned().unwrap_or(json!(0));
let data = row.get("data");
let mut item: Value = match data {
Some(Value::Object(map)) => Value::Object(map.clone()),
Some(Value::String(s)) => serde_json::from_str(s).unwrap_or(json!({})),
_ => json!({}),
};
if let Value::Object(ref mut map) = item {
map.insert("id".to_string(), id);
}
item
}
}
fn build_postgrest_filter(filter_expr: &str) -> Option<Vec<String>> {
let or_groups: Vec<&str> = filter_expr.split(',').collect();
if or_groups.len() == 1 {
let and_conditions: Vec<&str> = or_groups[0].split('&').collect();
let mut params = Vec::new();
for cond in and_conditions {
if let Some(param) = build_postgrest_condition(cond.trim()) {
params.push(param);
}
}
if params.is_empty() {
None
} else {
Some(params)
}
} else {
let mut or_parts = Vec::new();
for group in or_groups {
let and_conditions: Vec<&str> = group.split('&').collect();
let mut and_parts = Vec::new();
for cond in and_conditions {
if let Some(part) = build_postgrest_condition_part(cond.trim()) {
and_parts.push(part);
}
}
if and_parts.len() == 1 {
or_parts.push(and_parts.into_iter().next().unwrap());
} else if !and_parts.is_empty() {
or_parts.push(format!("and({})", and_parts.join(",")));
}
}
if or_parts.is_empty() {
None
} else {
Some(vec![format!("or=({})", or_parts.join(","))])
}
}
}
fn build_postgrest_condition(cond: &str) -> Option<String> {
let operators = [
(">=", "gte"),
("<=", "lte"),
(">", "gt"),
("<", "lt"),
("=", "eq"),
];
for (op, pg_op) in operators {
if let Some((field, val)) = cond.split_once(op) {
let field = field.trim();
let val = val.trim();
if !SAFE_IDENTIFIER_RE.is_match(field) {
return None;
}
return Some(format!(
"data->>{}={}.{}",
field,
pg_op,
urlencoding::encode(val)
));
}
}
None
}
fn build_postgrest_condition_part(cond: &str) -> Option<String> {
let operators = [
(">=", "gte"),
("<=", "lte"),
(">", "gt"),
("<", "lt"),
("=", "eq"),
];
for (op, pg_op) in operators {
if let Some((field, val)) = cond.split_once(op) {
let field = field.trim();
let val = val.trim();
if !SAFE_IDENTIFIER_RE.is_match(field) {
return None;
}
return Some(format!(
"data->>{}.{}.{}",
field,
pg_op,
urlencoding::encode(val)
));
}
}
None
}