1use regex::Regex;
11use reqwest::Client;
12use serde_json::{Value, json};
13use std::collections::HashMap;
14use std::sync::LazyLock;
15
16use super::CollectionQuery;
17use crate::Result;
18
19static SAFE_IDENTIFIER_RE: LazyLock<Regex> =
21 LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_.]{0,127}$").unwrap());
22
23fn validate_identifier(name: &str) -> Result<()> {
24 if SAFE_IDENTIFIER_RE.is_match(name) {
25 Ok(())
26 } else {
27 Err(crate::Error::Data(format!(
28 "Invalid identifier: {:?}",
29 name
30 )))
31 }
32}
33
34#[derive(Clone)]
36pub struct SupabaseDatabase {
37 client: Client,
38 project_url: String,
40 api_key: String,
42}
43
44impl SupabaseDatabase {
45 pub fn new(project_url: &str, api_key: &str) -> Self {
46 Self {
47 client: Client::new(),
48 project_url: project_url.trim_end_matches('/').to_string(),
49 api_key: api_key.to_string(),
50 }
51 }
52
53 fn rest_url(&self, table: &str) -> String {
55 format!("{}/rest/v1/{}", self.project_url, table)
56 }
57
58 async fn get_rows(&self, table: &str, query_params: &str) -> Result<Vec<Value>> {
60 let url = if query_params.is_empty() {
61 self.rest_url(table)
62 } else {
63 format!("{}?{}", self.rest_url(table), query_params)
64 };
65
66 let resp = self
67 .client
68 .get(&url)
69 .header("apikey", &self.api_key)
70 .header("Authorization", format!("Bearer {}", self.api_key))
71 .send()
72 .await
73 .map_err(|e| crate::Error::Data(format!("Supabase GET failed: {}", e)))?;
74
75 let status = resp.status();
76 let text = resp
77 .text()
78 .await
79 .map_err(|e| crate::Error::Data(format!("Supabase response read error: {}", e)))?;
80
81 if !status.is_success() {
82 return Err(crate::Error::Data(format!(
83 "Supabase GET error ({}): {}",
84 status, text
85 )));
86 }
87
88 serde_json::from_str(&text)
89 .map_err(|e| crate::Error::Data(format!("Supabase JSON parse error: {}", e)))
90 }
91
92 async fn insert_row(&self, table: &str, body: &Value) -> Result<Vec<Value>> {
94 let url = self.rest_url(table);
95 let resp = self
96 .client
97 .post(&url)
98 .header("apikey", &self.api_key)
99 .header("Authorization", format!("Bearer {}", self.api_key))
100 .header("Content-Type", "application/json")
101 .header("Prefer", "return=representation")
102 .json(body)
103 .send()
104 .await
105 .map_err(|e| crate::Error::Data(format!("Supabase POST failed: {}", e)))?;
106
107 let status = resp.status();
108 let text = resp.text().await.unwrap_or_default();
109
110 if !status.is_success() {
111 return Err(crate::Error::Data(format!(
112 "Supabase POST error ({}): {}",
113 status, text
114 )));
115 }
116
117 serde_json::from_str(&text)
118 .map_err(|e| crate::Error::Data(format!("Supabase POST parse error: {}", e)))
119 }
120
121 async fn patch_rows(
123 &self,
124 table: &str,
125 query_params: &str,
126 body: &Value,
127 ) -> Result<Vec<Value>> {
128 let url = format!("{}?{}", self.rest_url(table), query_params);
129 let resp = self
130 .client
131 .patch(&url)
132 .header("apikey", &self.api_key)
133 .header("Authorization", format!("Bearer {}", self.api_key))
134 .header("Content-Type", "application/json")
135 .header("Prefer", "return=representation")
136 .json(body)
137 .send()
138 .await
139 .map_err(|e| crate::Error::Data(format!("Supabase PATCH failed: {}", e)))?;
140
141 let status = resp.status();
142 let text = resp.text().await.unwrap_or_default();
143
144 if !status.is_success() {
145 return Err(crate::Error::Data(format!(
146 "Supabase PATCH error ({}): {}",
147 status, text
148 )));
149 }
150
151 serde_json::from_str(&text)
152 .map_err(|e| crate::Error::Data(format!("Supabase PATCH parse error: {}", e)))
153 }
154
155 async fn delete_rows(&self, table: &str, query_params: &str) -> Result<Vec<Value>> {
157 let url = format!("{}?{}", self.rest_url(table), query_params);
158 let resp = self
159 .client
160 .delete(&url)
161 .header("apikey", &self.api_key)
162 .header("Authorization", format!("Bearer {}", self.api_key))
163 .header("Prefer", "return=representation")
164 .send()
165 .await
166 .map_err(|e| crate::Error::Data(format!("Supabase DELETE failed: {}", e)))?;
167
168 let status = resp.status();
169 let text = resp.text().await.unwrap_or_default();
170
171 if !status.is_success() {
172 return Err(crate::Error::Data(format!(
173 "Supabase DELETE error ({}): {}",
174 status, text
175 )));
176 }
177
178 serde_json::from_str(&text)
179 .map_err(|e| crate::Error::Data(format!("Supabase DELETE parse error: {}", e)))
180 }
181
182 pub async fn get_collection(&self, name: &str) -> Result<Vec<Value>> {
187 validate_identifier(name)?;
188 let rows = self.get_rows(name, "select=id,data").await?;
189 Ok(rows
190 .into_iter()
191 .map(|row| Self::row_to_item(&row))
192 .collect())
193 }
194
195 pub async fn query_collection(
196 &self,
197 name: &str,
198 query: &CollectionQuery,
199 ) -> Result<Vec<Value>> {
200 validate_identifier(name)?;
201 let mut params = vec!["select=id,data".to_string()];
202
203 if let Some(ref filter_expr) = query.filter {
205 if let Some(filter_params) = build_postgrest_filter(filter_expr) {
206 params.extend(filter_params);
207 }
208 }
209
210 for forced in &query.forced_filters {
213 if let Some(filter_params) = build_postgrest_filter(forced) {
214 params.extend(filter_params);
215 }
216 }
217
218 if let Some(ref search) = query.search {
220 if !search.is_empty() {
221 params.push(format!("data=ilike.*{}*", urlencoding::encode(search)));
222 }
223 }
224
225 if let Some(ref sort) = query.sort {
227 let (field, desc) = if let Some((f, d)) = sort.rsplit_once(':') {
228 (f, d.eq_ignore_ascii_case("desc"))
229 } else {
230 (sort.as_str(), false)
231 };
232 validate_identifier(field)?;
233 params.push(format!(
234 "order=data->>{}.{}",
235 field,
236 if desc { "desc" } else { "asc" }
237 ));
238 }
239
240 if let Some(limit) = query.limit {
242 params.push(format!("limit={}", limit));
243 }
244 if let Some(offset) = query.offset {
245 params.push(format!("offset={}", offset));
246 }
247
248 let query_string = params.join("&");
249 let rows = self.get_rows(name, &query_string).await?;
250 Ok(rows
251 .into_iter()
252 .map(|row| Self::row_to_item(&row))
253 .collect())
254 }
255
256 pub async fn find_by(
257 &self,
258 collection: &str,
259 field: &str,
260 value: &Value,
261 ) -> Result<Vec<Value>> {
262 validate_identifier(collection)?;
263 validate_identifier(field)?;
264 let val_str = match value {
265 Value::String(s) => s.clone(),
266 other => other.to_string(),
267 };
268 let query = format!(
269 "select=id,data&data->>{}=eq.{}",
270 field,
271 urlencoding::encode(&val_str)
272 );
273 let rows = self.get_rows(collection, &query).await?;
274 Ok(rows
275 .into_iter()
276 .map(|row| Self::row_to_item(&row))
277 .collect())
278 }
279
280 pub async fn find_one_by(
281 &self,
282 collection: &str,
283 field: &str,
284 value: &Value,
285 ) -> Result<Option<Value>> {
286 validate_identifier(collection)?;
287 validate_identifier(field)?;
288 let val_str = match value {
289 Value::String(s) => s.clone(),
290 other => other.to_string(),
291 };
292 let query = format!(
293 "select=id,data&data->>{}=eq.{}&limit=1",
294 field,
295 urlencoding::encode(&val_str)
296 );
297 let rows = self.get_rows(collection, &query).await?;
298 Ok(rows.into_iter().next().map(|row| Self::row_to_item(&row)))
299 }
300
301 pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
302 validate_identifier(collection)?;
303 if let Value::Object(ref mut map) = item {
305 map.remove("id");
306 }
307
308 let body = json!({ "data": item });
309 let rows = self.insert_row(collection, &body).await?;
310
311 let row = rows
312 .into_iter()
313 .next()
314 .ok_or_else(|| crate::Error::Data("Supabase insert returned no rows".to_string()))?;
315
316 Ok(Self::row_to_item(&row))
317 }
318
319 pub async fn update(
320 &self,
321 collection: &str,
322 id: &Value,
323 updates: Value,
324 ) -> Result<Option<Value>> {
325 validate_identifier(collection)?;
326
327 let id_str = match id {
329 Value::Number(n) => n.to_string(),
330 Value::String(s) => s.clone(),
331 other => other.to_string(),
332 };
333 let query = format!("select=id,data&id=eq.{}", id_str);
334 let rows = self.get_rows(collection, &query).await?;
335
336 let row = match rows.into_iter().next() {
337 Some(r) => r,
338 None => return Ok(None),
339 };
340
341 let mut current = Self::row_to_item(&row);
342
343 if let (Value::Object(map), Value::Object(updates_map)) = (&mut current, &updates) {
345 for (k, v) in updates_map {
346 map.insert(k.clone(), v.clone());
347 }
348 }
349
350 let mut data_for_storage = current.clone();
352 if let Value::Object(ref mut map) = data_for_storage {
353 map.remove("id");
354 }
355
356 let filter = format!("id=eq.{}", id_str);
357 self.patch_rows(collection, &filter, &json!({ "data": data_for_storage }))
358 .await?;
359
360 Ok(Some(current))
361 }
362
363 pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
364 validate_identifier(collection)?;
365 let id_str = match id {
366 Value::Number(n) => n.to_string(),
367 Value::String(s) => s.clone(),
368 other => other.to_string(),
369 };
370 let filter = format!("id=eq.{}", id_str);
371 let deleted = self.delete_rows(collection, &filter).await?;
372 Ok(!deleted.is_empty())
373 }
374
375 pub async fn set(&self, key: &str, value: Value) -> Result<()> {
380 let value_str = serde_json::to_string(&value)?;
381 let url = self.rest_url("_kv_store");
383 let resp = self
384 .client
385 .post(&url)
386 .header("apikey", &self.api_key)
387 .header("Authorization", format!("Bearer {}", self.api_key))
388 .header("Content-Type", "application/json")
389 .header("Prefer", "resolution=merge-duplicates")
390 .json(&json!({ "key": key, "value": value_str }))
391 .send()
392 .await
393 .map_err(|e| crate::Error::Data(format!("Supabase KV set failed: {}", e)))?;
394
395 if !resp.status().is_success() {
396 let text = resp.text().await.unwrap_or_default();
397 return Err(crate::Error::Data(format!(
398 "Supabase KV set error: {}",
399 text
400 )));
401 }
402 Ok(())
403 }
404
405 pub async fn get(&self, key: &str) -> Result<Option<Value>> {
406 let query = format!("select=value&key=eq.{}", urlencoding::encode(key));
407 let rows = self.get_rows("_kv_store", &query).await?;
408
409 Ok(rows.into_iter().next().and_then(|row| {
410 row.get("value")
411 .and_then(|v| v.as_str())
412 .and_then(|s| serde_json::from_str(s).ok())
413 }))
414 }
415
416 pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
417 let existing = self.get(key).await?;
418 if existing.is_some() {
419 let filter = format!("key=eq.{}", urlencoding::encode(key));
420 self.delete_rows("_kv_store", &filter).await?;
421 }
422 Ok(existing)
423 }
424
425 pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
426 where
427 F: FnOnce(Option<&Value>) -> Value,
428 {
429 let current = self.get(key).await?;
430 let new_value = f(current.as_ref());
431 self.set(key, new_value.clone()).await?;
432 Ok(new_value)
433 }
434
435 pub async fn as_context(&self) -> Result<HashMap<String, Value>> {
440 let mut context = HashMap::new();
441
442 let collections = self
444 .get_rows("_collections", "select=name")
445 .await
446 .unwrap_or_default();
447 for row in collections {
448 if let Some(name) = row.get("name").and_then(|v| v.as_str()) {
449 if let Ok(items) = self.get_collection(name).await {
450 context.insert(name.to_string(), json!(items));
451 }
452 }
453 }
454
455 let kv_rows = self
457 .get_rows("_kv_store", "select=key,value")
458 .await
459 .unwrap_or_default();
460 for row in kv_rows {
461 if let (Some(key), Some(value_str)) = (
462 row.get("key").and_then(|v| v.as_str()),
463 row.get("value").and_then(|v| v.as_str()),
464 ) {
465 if let Ok(value) = serde_json::from_str::<Value>(value_str) {
466 context.insert(key.to_string(), value);
467 }
468 }
469 }
470
471 Ok(context)
472 }
473
474 pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
475 validate_identifier(name)?;
476 let url = format!("{}?id=gt.0", self.rest_url(name));
479 self.client
480 .delete(&url)
481 .header("apikey", &self.api_key)
482 .header("Authorization", format!("Bearer {}", self.api_key))
483 .send()
484 .await
485 .map_err(|e| crate::Error::Data(format!("Supabase clear collection failed: {}", e)))?;
486
487 for item in items {
489 self.create(name, item).await?;
490 }
491
492 Ok(())
493 }
494
495 fn row_to_item(row: &Value) -> Value {
501 let id = row.get("id").cloned().unwrap_or(json!(0));
502 let data = row.get("data");
503
504 let mut item: Value = match data {
506 Some(Value::Object(map)) => Value::Object(map.clone()),
507 Some(Value::String(s)) => serde_json::from_str(s).unwrap_or(json!({})),
508 _ => json!({}),
509 };
510
511 if let Value::Object(ref mut map) = item {
512 map.insert("id".to_string(), id);
513 }
514 item
515 }
516}
517
518fn build_postgrest_filter(filter_expr: &str) -> Option<Vec<String>> {
521 let or_groups: Vec<&str> = filter_expr.split(',').collect();
522
523 if or_groups.len() == 1 {
524 let and_conditions: Vec<&str> = or_groups[0].split('&').collect();
526 let mut params = Vec::new();
527 for cond in and_conditions {
528 if let Some(param) = build_postgrest_condition(cond.trim()) {
529 params.push(param);
530 }
531 }
532 if params.is_empty() {
533 None
534 } else {
535 Some(params)
536 }
537 } else {
538 let mut or_parts = Vec::new();
540 for group in or_groups {
541 let and_conditions: Vec<&str> = group.split('&').collect();
542 let mut and_parts = Vec::new();
543 for cond in and_conditions {
544 if let Some(part) = build_postgrest_condition_part(cond.trim()) {
545 and_parts.push(part);
546 }
547 }
548 if and_parts.len() == 1 {
549 or_parts.push(and_parts.into_iter().next().unwrap());
550 } else if !and_parts.is_empty() {
551 or_parts.push(format!("and({})", and_parts.join(",")));
552 }
553 }
554 if or_parts.is_empty() {
555 None
556 } else {
557 Some(vec![format!("or=({})", or_parts.join(","))])
558 }
559 }
560}
561
562fn build_postgrest_condition(cond: &str) -> Option<String> {
564 let operators = [
565 (">=", "gte"),
566 ("<=", "lte"),
567 (">", "gt"),
568 ("<", "lt"),
569 ("=", "eq"),
570 ];
571 for (op, pg_op) in operators {
572 if let Some((field, val)) = cond.split_once(op) {
573 let field = field.trim();
574 let val = val.trim();
575 if !SAFE_IDENTIFIER_RE.is_match(field) {
576 return None;
577 }
578 return Some(format!(
579 "data->>{}={}.{}",
580 field,
581 pg_op,
582 urlencoding::encode(val)
583 ));
584 }
585 }
586 None
587}
588
589fn build_postgrest_condition_part(cond: &str) -> Option<String> {
591 let operators = [
592 (">=", "gte"),
593 ("<=", "lte"),
594 (">", "gt"),
595 ("<", "lt"),
596 ("=", "eq"),
597 ];
598 for (op, pg_op) in operators {
599 if let Some((field, val)) = cond.split_once(op) {
600 let field = field.trim();
601 let val = val.trim();
602 if !SAFE_IDENTIFIER_RE.is_match(field) {
603 return None;
604 }
605 return Some(format!(
606 "data->>{}.{}.{}",
607 field,
608 pg_op,
609 urlencoding::encode(val)
610 ));
611 }
612 }
613 None
614}