Skip to main content

what_core/database/
supabase.rs

1//! Supabase database backend (PostgREST API)
2//!
3//! Uses the same JSON-blob schema as the SQLite/D1 backends:
4//!   - Collections: tables with `id BIGSERIAL PRIMARY KEY, data JSONB`
5//!   - Key-value: `_kv_store(key TEXT PRIMARY KEY, value TEXT)`
6//!   - Collection registry: `_collections(name TEXT PRIMARY KEY)`
7//!
8//! All operations go through the Supabase PostgREST REST API.
9
10use regex::Regex;
11use reqwest::Client;
12use serde_json::{Value, json};
13use std::collections::HashMap;
14use std::sync::LazyLock;
15
16use super::CollectionQuery;
17use crate::Result;
18
19/// Validate that an identifier (table name, field name) is safe.
20static SAFE_IDENTIFIER_RE: LazyLock<Regex> =
21    LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_.]{0,127}$").unwrap());
22
23fn validate_identifier(name: &str) -> Result<()> {
24    if SAFE_IDENTIFIER_RE.is_match(name) {
25        Ok(())
26    } else {
27        Err(crate::Error::Data(format!(
28            "Invalid identifier: {:?}",
29            name
30        )))
31    }
32}
33
34/// Supabase PostgREST database client
35#[derive(Clone)]
36pub struct SupabaseDatabase {
37    client: Client,
38    /// Base URL: https://<project-ref>.supabase.co
39    project_url: String,
40    /// service_role key (NOT anon key — bypasses RLS)
41    api_key: String,
42}
43
44impl SupabaseDatabase {
45    pub fn new(project_url: &str, api_key: &str) -> Self {
46        Self {
47            client: Client::new(),
48            project_url: project_url.trim_end_matches('/').to_string(),
49            api_key: api_key.to_string(),
50        }
51    }
52
53    /// Build the PostgREST base URL
54    fn rest_url(&self, table: &str) -> String {
55        format!("{}/rest/v1/{}", self.project_url, table)
56    }
57
58    /// GET request to PostgREST
59    async fn get_rows(&self, table: &str, query_params: &str) -> Result<Vec<Value>> {
60        let url = if query_params.is_empty() {
61            self.rest_url(table)
62        } else {
63            format!("{}?{}", self.rest_url(table), query_params)
64        };
65
66        let resp = self
67            .client
68            .get(&url)
69            .header("apikey", &self.api_key)
70            .header("Authorization", format!("Bearer {}", self.api_key))
71            .send()
72            .await
73            .map_err(|e| crate::Error::Data(format!("Supabase GET failed: {}", e)))?;
74
75        let status = resp.status();
76        let text = resp
77            .text()
78            .await
79            .map_err(|e| crate::Error::Data(format!("Supabase response read error: {}", e)))?;
80
81        if !status.is_success() {
82            return Err(crate::Error::Data(format!(
83                "Supabase GET error ({}): {}",
84                status, text
85            )));
86        }
87
88        serde_json::from_str(&text)
89            .map_err(|e| crate::Error::Data(format!("Supabase JSON parse error: {}", e)))
90    }
91
92    /// POST (insert) to PostgREST — returns inserted rows
93    async fn insert_row(&self, table: &str, body: &Value) -> Result<Vec<Value>> {
94        let url = self.rest_url(table);
95        let resp = self
96            .client
97            .post(&url)
98            .header("apikey", &self.api_key)
99            .header("Authorization", format!("Bearer {}", self.api_key))
100            .header("Content-Type", "application/json")
101            .header("Prefer", "return=representation")
102            .json(body)
103            .send()
104            .await
105            .map_err(|e| crate::Error::Data(format!("Supabase POST failed: {}", e)))?;
106
107        let status = resp.status();
108        let text = resp.text().await.unwrap_or_default();
109
110        if !status.is_success() {
111            return Err(crate::Error::Data(format!(
112                "Supabase POST error ({}): {}",
113                status, text
114            )));
115        }
116
117        serde_json::from_str(&text)
118            .map_err(|e| crate::Error::Data(format!("Supabase POST parse error: {}", e)))
119    }
120
121    /// PATCH (update) to PostgREST
122    async fn patch_rows(
123        &self,
124        table: &str,
125        query_params: &str,
126        body: &Value,
127    ) -> Result<Vec<Value>> {
128        let url = format!("{}?{}", self.rest_url(table), query_params);
129        let resp = self
130            .client
131            .patch(&url)
132            .header("apikey", &self.api_key)
133            .header("Authorization", format!("Bearer {}", self.api_key))
134            .header("Content-Type", "application/json")
135            .header("Prefer", "return=representation")
136            .json(body)
137            .send()
138            .await
139            .map_err(|e| crate::Error::Data(format!("Supabase PATCH failed: {}", e)))?;
140
141        let status = resp.status();
142        let text = resp.text().await.unwrap_or_default();
143
144        if !status.is_success() {
145            return Err(crate::Error::Data(format!(
146                "Supabase PATCH error ({}): {}",
147                status, text
148            )));
149        }
150
151        serde_json::from_str(&text)
152            .map_err(|e| crate::Error::Data(format!("Supabase PATCH parse error: {}", e)))
153    }
154
155    /// DELETE from PostgREST
156    async fn delete_rows(&self, table: &str, query_params: &str) -> Result<Vec<Value>> {
157        let url = format!("{}?{}", self.rest_url(table), query_params);
158        let resp = self
159            .client
160            .delete(&url)
161            .header("apikey", &self.api_key)
162            .header("Authorization", format!("Bearer {}", self.api_key))
163            .header("Prefer", "return=representation")
164            .send()
165            .await
166            .map_err(|e| crate::Error::Data(format!("Supabase DELETE failed: {}", e)))?;
167
168        let status = resp.status();
169        let text = resp.text().await.unwrap_or_default();
170
171        if !status.is_success() {
172            return Err(crate::Error::Data(format!(
173                "Supabase DELETE error ({}): {}",
174                status, text
175            )));
176        }
177
178        serde_json::from_str(&text)
179            .map_err(|e| crate::Error::Data(format!("Supabase DELETE parse error: {}", e)))
180    }
181
182    // -----------------------------------------------------------------------
183    // Collection operations
184    // -----------------------------------------------------------------------
185
186    pub async fn get_collection(&self, name: &str) -> Result<Vec<Value>> {
187        validate_identifier(name)?;
188        let rows = self.get_rows(name, "select=id,data").await?;
189        Ok(rows
190            .into_iter()
191            .map(|row| Self::row_to_item(&row))
192            .collect())
193    }
194
195    pub async fn query_collection(
196        &self,
197        name: &str,
198        query: &CollectionQuery,
199    ) -> Result<Vec<Value>> {
200        validate_identifier(name)?;
201        let mut params = vec!["select=id,data".to_string()];
202
203        // Filter
204        if let Some(ref filter_expr) = query.filter {
205            if let Some(filter_params) = build_postgrest_filter(filter_expr) {
206                params.extend(filter_params);
207            }
208        }
209
210        // Policy-forced scope filters — each becomes independent query params
211        // that PostgREST ANDs with the rest; the user cannot widen past them.
212        for forced in &query.forced_filters {
213            if let Some(filter_params) = build_postgrest_filter(forced) {
214                params.extend(filter_params);
215            }
216        }
217
218        // Search (LIKE on the data JSON column)
219        if let Some(ref search) = query.search {
220            if !search.is_empty() {
221                params.push(format!("data=ilike.*{}*", urlencoding::encode(search)));
222            }
223        }
224
225        // Sort
226        if let Some(ref sort) = query.sort {
227            let (field, desc) = if let Some((f, d)) = sort.rsplit_once(':') {
228                (f, d.eq_ignore_ascii_case("desc"))
229            } else {
230                (sort.as_str(), false)
231            };
232            validate_identifier(field)?;
233            params.push(format!(
234                "order=data->>{}.{}",
235                field,
236                if desc { "desc" } else { "asc" }
237            ));
238        }
239
240        // Limit + Offset via Range header would be cleaner, but query params work too
241        if let Some(limit) = query.limit {
242            params.push(format!("limit={}", limit));
243        }
244        if let Some(offset) = query.offset {
245            params.push(format!("offset={}", offset));
246        }
247
248        let query_string = params.join("&");
249        let rows = self.get_rows(name, &query_string).await?;
250        Ok(rows
251            .into_iter()
252            .map(|row| Self::row_to_item(&row))
253            .collect())
254    }
255
256    pub async fn find_by(
257        &self,
258        collection: &str,
259        field: &str,
260        value: &Value,
261    ) -> Result<Vec<Value>> {
262        validate_identifier(collection)?;
263        validate_identifier(field)?;
264        let val_str = match value {
265            Value::String(s) => s.clone(),
266            other => other.to_string(),
267        };
268        let query = format!(
269            "select=id,data&data->>{}=eq.{}",
270            field,
271            urlencoding::encode(&val_str)
272        );
273        let rows = self.get_rows(collection, &query).await?;
274        Ok(rows
275            .into_iter()
276            .map(|row| Self::row_to_item(&row))
277            .collect())
278    }
279
280    pub async fn find_one_by(
281        &self,
282        collection: &str,
283        field: &str,
284        value: &Value,
285    ) -> Result<Option<Value>> {
286        validate_identifier(collection)?;
287        validate_identifier(field)?;
288        let val_str = match value {
289            Value::String(s) => s.clone(),
290            other => other.to_string(),
291        };
292        let query = format!(
293            "select=id,data&data->>{}=eq.{}&limit=1",
294            field,
295            urlencoding::encode(&val_str)
296        );
297        let rows = self.get_rows(collection, &query).await?;
298        Ok(rows.into_iter().next().map(|row| Self::row_to_item(&row)))
299    }
300
301    pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
302        validate_identifier(collection)?;
303        // Remove id — auto-generated by BIGSERIAL
304        if let Value::Object(ref mut map) = item {
305            map.remove("id");
306        }
307
308        let body = json!({ "data": item });
309        let rows = self.insert_row(collection, &body).await?;
310
311        let row = rows
312            .into_iter()
313            .next()
314            .ok_or_else(|| crate::Error::Data("Supabase insert returned no rows".to_string()))?;
315
316        Ok(Self::row_to_item(&row))
317    }
318
319    pub async fn update(
320        &self,
321        collection: &str,
322        id: &Value,
323        updates: Value,
324    ) -> Result<Option<Value>> {
325        validate_identifier(collection)?;
326
327        // Fetch current row
328        let id_str = match id {
329            Value::Number(n) => n.to_string(),
330            Value::String(s) => s.clone(),
331            other => other.to_string(),
332        };
333        let query = format!("select=id,data&id=eq.{}", id_str);
334        let rows = self.get_rows(collection, &query).await?;
335
336        let row = match rows.into_iter().next() {
337            Some(r) => r,
338            None => return Ok(None),
339        };
340
341        let mut current = Self::row_to_item(&row);
342
343        // Merge updates into current data
344        if let (Value::Object(map), Value::Object(updates_map)) = (&mut current, &updates) {
345            for (k, v) in updates_map {
346                map.insert(k.clone(), v.clone());
347            }
348        }
349
350        // Strip id from data blob (stored separately in id column)
351        let mut data_for_storage = current.clone();
352        if let Value::Object(ref mut map) = data_for_storage {
353            map.remove("id");
354        }
355
356        let filter = format!("id=eq.{}", id_str);
357        self.patch_rows(collection, &filter, &json!({ "data": data_for_storage }))
358            .await?;
359
360        Ok(Some(current))
361    }
362
363    pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
364        validate_identifier(collection)?;
365        let id_str = match id {
366            Value::Number(n) => n.to_string(),
367            Value::String(s) => s.clone(),
368            other => other.to_string(),
369        };
370        let filter = format!("id=eq.{}", id_str);
371        let deleted = self.delete_rows(collection, &filter).await?;
372        Ok(!deleted.is_empty())
373    }
374
375    // -----------------------------------------------------------------------
376    // Key-value operations
377    // -----------------------------------------------------------------------
378
379    pub async fn set(&self, key: &str, value: Value) -> Result<()> {
380        let value_str = serde_json::to_string(&value)?;
381        // Upsert: use Prefer: resolution=merge-duplicates
382        let url = self.rest_url("_kv_store");
383        let resp = self
384            .client
385            .post(&url)
386            .header("apikey", &self.api_key)
387            .header("Authorization", format!("Bearer {}", self.api_key))
388            .header("Content-Type", "application/json")
389            .header("Prefer", "resolution=merge-duplicates")
390            .json(&json!({ "key": key, "value": value_str }))
391            .send()
392            .await
393            .map_err(|e| crate::Error::Data(format!("Supabase KV set failed: {}", e)))?;
394
395        if !resp.status().is_success() {
396            let text = resp.text().await.unwrap_or_default();
397            return Err(crate::Error::Data(format!(
398                "Supabase KV set error: {}",
399                text
400            )));
401        }
402        Ok(())
403    }
404
405    pub async fn get(&self, key: &str) -> Result<Option<Value>> {
406        let query = format!("select=value&key=eq.{}", urlencoding::encode(key));
407        let rows = self.get_rows("_kv_store", &query).await?;
408
409        Ok(rows.into_iter().next().and_then(|row| {
410            row.get("value")
411                .and_then(|v| v.as_str())
412                .and_then(|s| serde_json::from_str(s).ok())
413        }))
414    }
415
416    pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
417        let existing = self.get(key).await?;
418        if existing.is_some() {
419            let filter = format!("key=eq.{}", urlencoding::encode(key));
420            self.delete_rows("_kv_store", &filter).await?;
421        }
422        Ok(existing)
423    }
424
425    pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
426    where
427        F: FnOnce(Option<&Value>) -> Value,
428    {
429        let current = self.get(key).await?;
430        let new_value = f(current.as_ref());
431        self.set(key, new_value.clone()).await?;
432        Ok(new_value)
433    }
434
435    // -----------------------------------------------------------------------
436    // Context & bulk operations
437    // -----------------------------------------------------------------------
438
439    pub async fn as_context(&self) -> Result<HashMap<String, Value>> {
440        let mut context = HashMap::new();
441
442        // Load all collections
443        let collections = self
444            .get_rows("_collections", "select=name")
445            .await
446            .unwrap_or_default();
447        for row in collections {
448            if let Some(name) = row.get("name").and_then(|v| v.as_str()) {
449                if let Ok(items) = self.get_collection(name).await {
450                    context.insert(name.to_string(), json!(items));
451                }
452            }
453        }
454
455        // Load all KV pairs
456        let kv_rows = self
457            .get_rows("_kv_store", "select=key,value")
458            .await
459            .unwrap_or_default();
460        for row in kv_rows {
461            if let (Some(key), Some(value_str)) = (
462                row.get("key").and_then(|v| v.as_str()),
463                row.get("value").and_then(|v| v.as_str()),
464            ) {
465                if let Ok(value) = serde_json::from_str::<Value>(value_str) {
466                    context.insert(key.to_string(), value);
467                }
468            }
469        }
470
471        Ok(context)
472    }
473
474    pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
475        validate_identifier(name)?;
476        // Clear existing data
477        // PostgREST requires a filter for DELETE — use a truthy condition
478        let url = format!("{}?id=gt.0", self.rest_url(name));
479        self.client
480            .delete(&url)
481            .header("apikey", &self.api_key)
482            .header("Authorization", format!("Bearer {}", self.api_key))
483            .send()
484            .await
485            .map_err(|e| crate::Error::Data(format!("Supabase clear collection failed: {}", e)))?;
486
487        // Insert new items
488        for item in items {
489            self.create(name, item).await?;
490        }
491
492        Ok(())
493    }
494
495    // -----------------------------------------------------------------------
496    // Helpers
497    // -----------------------------------------------------------------------
498
499    /// Convert a PostgREST row (with `id` and `data` columns) to a merged JSON item
500    fn row_to_item(row: &Value) -> Value {
501        let id = row.get("id").cloned().unwrap_or(json!(0));
502        let data = row.get("data");
503
504        // data can be a JSON object directly (JSONB) or a string (TEXT)
505        let mut item: Value = match data {
506            Some(Value::Object(map)) => Value::Object(map.clone()),
507            Some(Value::String(s)) => serde_json::from_str(s).unwrap_or(json!({})),
508            _ => json!({}),
509        };
510
511        if let Value::Object(ref mut map) = item {
512            map.insert("id".to_string(), id);
513        }
514        item
515    }
516}
517
518/// Build PostgREST filter query parameters from our filter expression syntax.
519/// Returns a list of query parameter strings.
520fn build_postgrest_filter(filter_expr: &str) -> Option<Vec<String>> {
521    let or_groups: Vec<&str> = filter_expr.split(',').collect();
522
523    if or_groups.len() == 1 {
524        // Simple AND conditions
525        let and_conditions: Vec<&str> = or_groups[0].split('&').collect();
526        let mut params = Vec::new();
527        for cond in and_conditions {
528            if let Some(param) = build_postgrest_condition(cond.trim()) {
529                params.push(param);
530            }
531        }
532        if params.is_empty() {
533            None
534        } else {
535            Some(params)
536        }
537    } else {
538        // OR groups — use PostgREST `or` syntax
539        let mut or_parts = Vec::new();
540        for group in or_groups {
541            let and_conditions: Vec<&str> = group.split('&').collect();
542            let mut and_parts = Vec::new();
543            for cond in and_conditions {
544                if let Some(part) = build_postgrest_condition_part(cond.trim()) {
545                    and_parts.push(part);
546                }
547            }
548            if and_parts.len() == 1 {
549                or_parts.push(and_parts.into_iter().next().unwrap());
550            } else if !and_parts.is_empty() {
551                or_parts.push(format!("and({})", and_parts.join(",")));
552            }
553        }
554        if or_parts.is_empty() {
555            None
556        } else {
557            Some(vec![format!("or=({})", or_parts.join(","))])
558        }
559    }
560}
561
562/// Build a single PostgREST filter parameter (e.g., "data->>field=eq.value")
563fn build_postgrest_condition(cond: &str) -> Option<String> {
564    let operators = [
565        (">=", "gte"),
566        ("<=", "lte"),
567        (">", "gt"),
568        ("<", "lt"),
569        ("=", "eq"),
570    ];
571    for (op, pg_op) in operators {
572        if let Some((field, val)) = cond.split_once(op) {
573            let field = field.trim();
574            let val = val.trim();
575            if !SAFE_IDENTIFIER_RE.is_match(field) {
576                return None;
577            }
578            return Some(format!(
579                "data->>{}={}.{}",
580                field,
581                pg_op,
582                urlencoding::encode(val)
583            ));
584        }
585    }
586    None
587}
588
589/// Build a PostgREST condition part for use inside or() syntax
590fn build_postgrest_condition_part(cond: &str) -> Option<String> {
591    let operators = [
592        (">=", "gte"),
593        ("<=", "lte"),
594        (">", "gt"),
595        ("<", "lt"),
596        ("=", "eq"),
597    ];
598    for (op, pg_op) in operators {
599        if let Some((field, val)) = cond.split_once(op) {
600            let field = field.trim();
601            let val = val.trim();
602            if !SAFE_IDENTIFIER_RE.is_match(field) {
603                return None;
604            }
605            return Some(format!(
606                "data->>{}.{}.{}",
607                field,
608                pg_op,
609                urlencoding::encode(val)
610            ));
611        }
612    }
613    None
614}