supabase/
database.rs

1//! Database module for Supabase REST API
2
3use crate::{
4    error::{Error, Result},
5    types::{FilterOperator, JsonValue, OrderDirection, SupabaseConfig},
6};
7use reqwest::Client as HttpClient;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, sync::Arc};
10use tracing::{debug, info};
11use url::Url;
12
13/// Database client for REST API operations
14#[derive(Debug, Clone)]
15pub struct Database {
16    http_client: Arc<HttpClient>,
17    config: Arc<SupabaseConfig>,
18}
19
20/// Query builder for SELECT operations
21#[derive(Debug, Clone)]
22pub struct QueryBuilder {
23    database: Database,
24    table: String,
25    columns: Option<String>,
26    filters: Vec<Filter>,
27    order_by: Vec<OrderBy>,
28    limit: Option<u32>,
29    offset: Option<u32>,
30    single: bool,
31}
32
33/// Insert builder for INSERT operations
34#[derive(Debug, Clone)]
35pub struct InsertBuilder {
36    database: Database,
37    table: String,
38    data: JsonValue,
39    upsert: bool,
40    on_conflict: Option<String>,
41    returning: Option<String>,
42}
43
44/// Update builder for UPDATE operations
45#[derive(Debug, Clone)]
46pub struct UpdateBuilder {
47    database: Database,
48    table: String,
49    data: JsonValue,
50    filters: Vec<Filter>,
51    returning: Option<String>,
52}
53
54/// Delete builder for DELETE operations
55#[derive(Debug, Clone)]
56pub struct DeleteBuilder {
57    database: Database,
58    table: String,
59    filters: Vec<Filter>,
60    returning: Option<String>,
61}
62
63/// Database filter for WHERE clauses
64#[derive(Debug, Clone)]
65struct Filter {
66    column: String,
67    operator: FilterOperator,
68    value: String,
69}
70
71/// Order by clause
72#[derive(Debug, Clone)]
73struct OrderBy {
74    column: String,
75    direction: OrderDirection,
76    #[allow(dead_code)]
77    nulls_first: Option<bool>,
78}
79
80/// Database response wrapper
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DatabaseResponse<T> {
83    pub data: T,
84    pub count: Option<u64>,
85}
86
87impl Database {
88    /// Create a new Database instance
89    pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
90        debug!("Initializing Database module");
91
92        Ok(Self {
93            http_client,
94            config,
95        })
96    }
97
98    /// Start a query from a table
99    pub fn from(&self, table: &str) -> QueryBuilder {
100        QueryBuilder::new(self.clone(), table.to_string())
101    }
102
103    /// Insert data into a table
104    pub fn insert(&self, table: &str) -> InsertBuilder {
105        InsertBuilder::new(self.clone(), table.to_string())
106    }
107
108    /// Update data in a table
109    pub fn update(&self, table: &str) -> UpdateBuilder {
110        UpdateBuilder::new(self.clone(), table.to_string())
111    }
112
113    /// Delete data from a table
114    pub fn delete(&self, table: &str) -> DeleteBuilder {
115        DeleteBuilder::new(self.clone(), table.to_string())
116    }
117
118    /// Execute a custom SQL query via RPC
119    pub async fn rpc(&self, function_name: &str, params: Option<JsonValue>) -> Result<JsonValue> {
120        debug!("Executing RPC function: {}", function_name);
121
122        let url = format!("{}/rest/v1/rpc/{}", self.config.url, function_name);
123
124        let mut request = self.http_client.post(&url);
125
126        if let Some(params) = params {
127            request = request.json(&params);
128        }
129
130        let response = request.send().await?;
131
132        if !response.status().is_success() {
133            let status = response.status();
134            let error_msg = match response.text().await {
135                Ok(text) => text,
136                Err(_) => format!("RPC failed with status: {}", status),
137            };
138            return Err(Error::database(error_msg));
139        }
140
141        let result: JsonValue = response.json().await?;
142        info!("RPC function {} executed successfully", function_name);
143
144        Ok(result)
145    }
146
147    /// Get the base REST URL
148    fn rest_url(&self) -> String {
149        format!("{}/rest/v1", self.config.url)
150    }
151
152    /// Build query parameters from filters
153    fn build_query_params(&self, filters: &[Filter]) -> HashMap<String, String> {
154        let mut params = HashMap::new();
155
156        for filter in filters {
157            let (key, value) = match filter.operator {
158                FilterOperator::Equal => (filter.column.clone(), format!("eq.{}", filter.value)),
159                FilterOperator::NotEqual => {
160                    (filter.column.clone(), format!("neq.{}", filter.value))
161                }
162                FilterOperator::GreaterThan => {
163                    (filter.column.clone(), format!("gt.{}", filter.value))
164                }
165                FilterOperator::GreaterThanOrEqual => {
166                    (filter.column.clone(), format!("gte.{}", filter.value))
167                }
168                FilterOperator::LessThan => (filter.column.clone(), format!("lt.{}", filter.value)),
169                FilterOperator::LessThanOrEqual => {
170                    (filter.column.clone(), format!("lte.{}", filter.value))
171                }
172                FilterOperator::Like => (filter.column.clone(), format!("like.{}", filter.value)),
173                FilterOperator::ILike => (filter.column.clone(), format!("ilike.{}", filter.value)),
174                FilterOperator::Is => (filter.column.clone(), format!("is.{}", filter.value)),
175                FilterOperator::In => (filter.column.clone(), format!("in.{}", filter.value)),
176                FilterOperator::Contains => (filter.column.clone(), format!("cs.{}", filter.value)),
177                FilterOperator::ContainedBy => {
178                    (filter.column.clone(), format!("cd.{}", filter.value))
179                }
180                FilterOperator::StrictlyLeft => {
181                    (filter.column.clone(), format!("sl.{}", filter.value))
182                }
183                FilterOperator::StrictlyRight => {
184                    (filter.column.clone(), format!("sr.{}", filter.value))
185                }
186                FilterOperator::NotExtendToRight => {
187                    (filter.column.clone(), format!("nxr.{}", filter.value))
188                }
189                FilterOperator::NotExtendToLeft => {
190                    (filter.column.clone(), format!("nxl.{}", filter.value))
191                }
192                FilterOperator::Adjacent => {
193                    (filter.column.clone(), format!("adj.{}", filter.value))
194                }
195            };
196
197            params.insert(key, value);
198        }
199
200        params
201    }
202}
203
204impl QueryBuilder {
205    fn new(database: Database, table: String) -> Self {
206        Self {
207            database,
208            table,
209            columns: None,
210            filters: Vec::new(),
211            order_by: Vec::new(),
212            limit: None,
213            offset: None,
214            single: false,
215        }
216    }
217
218    /// Select specific columns
219    pub fn select(mut self, columns: &str) -> Self {
220        self.columns = Some(columns.to_string());
221        self
222    }
223
224    /// Add an equality filter
225    pub fn eq(mut self, column: &str, value: &str) -> Self {
226        self.filters.push(Filter {
227            column: column.to_string(),
228            operator: FilterOperator::Equal,
229            value: value.to_string(),
230        });
231        self
232    }
233
234    /// Add a not equal filter
235    pub fn neq(mut self, column: &str, value: &str) -> Self {
236        self.filters.push(Filter {
237            column: column.to_string(),
238            operator: FilterOperator::NotEqual,
239            value: value.to_string(),
240        });
241        self
242    }
243
244    /// Add a greater than filter
245    pub fn gt(mut self, column: &str, value: &str) -> Self {
246        self.filters.push(Filter {
247            column: column.to_string(),
248            operator: FilterOperator::GreaterThan,
249            value: value.to_string(),
250        });
251        self
252    }
253
254    /// Add a greater than or equal filter
255    pub fn gte(mut self, column: &str, value: &str) -> Self {
256        self.filters.push(Filter {
257            column: column.to_string(),
258            operator: FilterOperator::GreaterThanOrEqual,
259            value: value.to_string(),
260        });
261        self
262    }
263
264    /// Add a less than filter
265    pub fn lt(mut self, column: &str, value: &str) -> Self {
266        self.filters.push(Filter {
267            column: column.to_string(),
268            operator: FilterOperator::LessThan,
269            value: value.to_string(),
270        });
271        self
272    }
273
274    /// Add a less than or equal filter
275    pub fn lte(mut self, column: &str, value: &str) -> Self {
276        self.filters.push(Filter {
277            column: column.to_string(),
278            operator: FilterOperator::LessThanOrEqual,
279            value: value.to_string(),
280        });
281        self
282    }
283
284    /// Add a LIKE filter
285    pub fn like(mut self, column: &str, pattern: &str) -> Self {
286        self.filters.push(Filter {
287            column: column.to_string(),
288            operator: FilterOperator::Like,
289            value: pattern.to_string(),
290        });
291        self
292    }
293
294    /// Add an ILIKE filter (case-insensitive)
295    pub fn ilike(mut self, column: &str, pattern: &str) -> Self {
296        self.filters.push(Filter {
297            column: column.to_string(),
298            operator: FilterOperator::ILike,
299            value: pattern.to_string(),
300        });
301        self
302    }
303
304    /// Add an IS filter (for null checks)
305    pub fn is(mut self, column: &str, value: &str) -> Self {
306        self.filters.push(Filter {
307            column: column.to_string(),
308            operator: FilterOperator::Is,
309            value: value.to_string(),
310        });
311        self
312    }
313
314    /// Add an IN filter
315    pub fn r#in(mut self, column: &str, values: &[&str]) -> Self {
316        let value = format!("({})", values.join(","));
317        self.filters.push(Filter {
318            column: column.to_string(),
319            operator: FilterOperator::In,
320            value,
321        });
322        self
323    }
324
325    /// Add ordering
326    pub fn order(mut self, column: &str, direction: OrderDirection) -> Self {
327        self.order_by.push(OrderBy {
328            column: column.to_string(),
329            direction,
330            nulls_first: None,
331        });
332        self
333    }
334
335    /// Set limit
336    pub fn limit(mut self, limit: u32) -> Self {
337        self.limit = Some(limit);
338        self
339    }
340
341    /// Set offset
342    pub fn offset(mut self, offset: u32) -> Self {
343        self.offset = Some(offset);
344        self
345    }
346
347    /// Return single row
348    pub fn single(mut self) -> Self {
349        self.single = true;
350        self
351    }
352
353    /// Execute the query
354    pub async fn execute<T>(&self) -> Result<Vec<T>>
355    where
356        T: for<'de> Deserialize<'de>,
357    {
358        debug!("Executing SELECT query on table: {}", self.table);
359
360        let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
361
362        // Add query parameters
363        let mut query_params = self.database.build_query_params(&self.filters);
364
365        if let Some(ref columns) = self.columns {
366            query_params.insert("select".to_string(), columns.clone());
367        }
368
369        if !self.order_by.is_empty() {
370            let order_clauses: Vec<String> = self
371                .order_by
372                .iter()
373                .map(|order| {
374                    let direction = match order.direction {
375                        OrderDirection::Ascending => "asc",
376                        OrderDirection::Descending => "desc",
377                    };
378                    format!("{}.{}", order.column, direction)
379                })
380                .collect();
381            query_params.insert("order".to_string(), order_clauses.join(","));
382        }
383
384        if let Some(limit) = self.limit {
385            query_params.insert("limit".to_string(), limit.to_string());
386        }
387
388        if let Some(offset) = self.offset {
389            query_params.insert("offset".to_string(), offset.to_string());
390        }
391
392        // Set URL query parameters
393        for (key, value) in query_params {
394            url.query_pairs_mut().append_pair(&key, &value);
395        }
396
397        debug!("Generated query URL: {}", url.as_str());
398        let mut request = self.database.http_client.get(url.as_str());
399
400        if self.single {
401            request = request.header("Accept", "application/vnd.pgrst.object+json");
402        }
403
404        let response = request.send().await?;
405
406        if !response.status().is_success() {
407            let status = response.status();
408            let error_msg = match response.text().await {
409                Ok(text) => text,
410                Err(_) => format!("Query failed with status: {}", status),
411            };
412            return Err(Error::database(error_msg));
413        }
414
415        let result = if self.single {
416            let single_item: T = response.json().await?;
417            vec![single_item]
418        } else {
419            response.json().await?
420        };
421
422        info!(
423            "SELECT query executed successfully on table: {}",
424            self.table
425        );
426        Ok(result)
427    }
428
429    /// Execute the query and return a single row
430    pub async fn single_execute<T>(&self) -> Result<Option<T>>
431    where
432        T: for<'de> Deserialize<'de>,
433    {
434        let mut builder = self.clone();
435        builder.single = true;
436
437        let results = builder.execute().await?;
438        Ok(results.into_iter().next())
439    }
440}
441
442impl InsertBuilder {
443    fn new(database: Database, table: String) -> Self {
444        Self {
445            database,
446            table,
447            data: JsonValue::Null,
448            upsert: false,
449            on_conflict: None,
450            returning: None,
451        }
452    }
453
454    /// Set the data to insert
455    pub fn values<T: Serialize>(mut self, data: T) -> Result<Self> {
456        self.data = serde_json::to_value(data)?;
457        Ok(self)
458    }
459
460    /// Enable upsert mode
461    pub fn upsert(mut self) -> Self {
462        self.upsert = true;
463        self
464    }
465
466    /// Set conflict resolution
467    pub fn on_conflict(mut self, columns: &str) -> Self {
468        self.on_conflict = Some(columns.to_string());
469        self
470    }
471
472    /// Set columns to return
473    pub fn returning(mut self, columns: &str) -> Self {
474        self.returning = Some(columns.to_string());
475        self
476    }
477
478    /// Execute the insert
479    pub async fn execute<T>(&self) -> Result<Vec<T>>
480    where
481        T: for<'de> Deserialize<'de>,
482    {
483        debug!("Executing INSERT query on table: {}", self.table);
484
485        let url = format!("{}/{}", self.database.rest_url(), self.table);
486        let mut request = self.database.http_client.post(&url).json(&self.data);
487
488        if let Some(ref _returning) = self.returning {
489            request = request.header("Prefer", "return=representation".to_string());
490        }
491
492        if self.upsert {
493            request = request.header("Prefer", "resolution=merge-duplicates");
494        }
495
496        let response = request.send().await?;
497
498        if !response.status().is_success() {
499            let status = response.status();
500            let error_msg = match response.text().await {
501                Ok(text) => text,
502                Err(_) => format!("Insert failed with status: {}", status),
503            };
504            return Err(Error::database(error_msg));
505        }
506
507        let result: Vec<T> = response.json().await?;
508        info!(
509            "INSERT query executed successfully on table: {}",
510            self.table
511        );
512
513        Ok(result)
514    }
515}
516
517impl UpdateBuilder {
518    fn new(database: Database, table: String) -> Self {
519        Self {
520            database,
521            table,
522            data: JsonValue::Null,
523            filters: Vec::new(),
524            returning: None,
525        }
526    }
527
528    /// Set the data to update
529    pub fn set<T: Serialize>(mut self, data: T) -> Result<Self> {
530        self.data = serde_json::to_value(data)?;
531        Ok(self)
532    }
533
534    /// Add an equality filter
535    pub fn eq(mut self, column: &str, value: &str) -> Self {
536        self.filters.push(Filter {
537            column: column.to_string(),
538            operator: FilterOperator::Equal,
539            value: value.to_string(),
540        });
541        self
542    }
543
544    /// Set columns to return
545    pub fn returning(mut self, columns: &str) -> Self {
546        self.returning = Some(columns.to_string());
547        self
548    }
549
550    /// Execute the update
551    pub async fn execute<T>(&self) -> Result<Vec<T>>
552    where
553        T: for<'de> Deserialize<'de>,
554    {
555        debug!("Executing UPDATE query on table: {}", self.table);
556
557        let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
558
559        // Add filters as query parameters
560        let query_params = self.database.build_query_params(&self.filters);
561        for (key, value) in query_params {
562            url.query_pairs_mut().append_pair(&key, &value);
563        }
564
565        let mut request = self
566            .database
567            .http_client
568            .patch(url.as_str())
569            .json(&self.data);
570
571        if let Some(ref _returning) = self.returning {
572            request = request.header("Prefer", "return=representation");
573        }
574
575        let response = request.send().await?;
576
577        if !response.status().is_success() {
578            let status = response.status();
579            let error_msg = match response.text().await {
580                Ok(text) => text,
581                Err(_) => format!("Update failed with status: {}", status),
582            };
583            return Err(Error::database(error_msg));
584        }
585
586        let result: Vec<T> = response.json().await?;
587        info!(
588            "UPDATE query executed successfully on table: {}",
589            self.table
590        );
591
592        Ok(result)
593    }
594}
595
596impl DeleteBuilder {
597    fn new(database: Database, table: String) -> Self {
598        Self {
599            database,
600            table,
601            filters: Vec::new(),
602            returning: None,
603        }
604    }
605
606    /// Add an equality filter
607    pub fn eq(mut self, column: &str, value: &str) -> Self {
608        self.filters.push(Filter {
609            column: column.to_string(),
610            operator: FilterOperator::Equal,
611            value: value.to_string(),
612        });
613        self
614    }
615
616    /// Set columns to return
617    pub fn returning(mut self, columns: &str) -> Self {
618        self.returning = Some(columns.to_string());
619        self
620    }
621
622    /// Execute the delete
623    pub async fn execute<T>(&self) -> Result<Vec<T>>
624    where
625        T: for<'de> Deserialize<'de>,
626    {
627        debug!("Executing DELETE query on table: {}", self.table);
628
629        let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
630
631        // Add filters as query parameters
632        let query_params = self.database.build_query_params(&self.filters);
633        for (key, value) in query_params {
634            url.query_pairs_mut().append_pair(&key, &value);
635        }
636
637        let mut request = self.database.http_client.delete(url.as_str());
638
639        if let Some(ref _returning) = self.returning {
640            request = request.header("Prefer", "return=representation");
641        }
642
643        let response = request.send().await?;
644
645        if !response.status().is_success() {
646            let status = response.status();
647            let error_msg = match response.text().await {
648                Ok(text) => text,
649                Err(_) => format!("Delete failed with status: {}", status),
650            };
651            return Err(Error::database(error_msg));
652        }
653
654        let result: Vec<T> = response.json().await?;
655        info!(
656            "DELETE query executed successfully on table: {}",
657            self.table
658        );
659
660        Ok(result)
661    }
662}