supabase/
database.rs

1//! Database module for Supabase REST API
2
3use crate::{
4    error::{Error, Result},
5    types::{FilterOperator, JsonValue, OrderDirection, SupabaseConfig},
6};
7use reqwest::Client as HttpClient;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, sync::Arc};
10use tracing::{debug, info};
11use url::Url;
12
13/// Database client for REST API operations
14#[derive(Debug, Clone)]
15pub struct Database {
16    http_client: Arc<HttpClient>,
17    config: Arc<SupabaseConfig>,
18}
19
20/// Query builder for SELECT operations
21#[derive(Debug, Clone)]
22pub struct QueryBuilder {
23    database: Database,
24    table: String,
25    columns: Option<String>,
26    filters: Vec<Filter>,
27    order_by: Vec<OrderBy>,
28    limit: Option<u32>,
29    offset: Option<u32>,
30    single: bool,
31}
32
33/// Insert builder for INSERT operations
34#[derive(Debug, Clone)]
35pub struct InsertBuilder {
36    database: Database,
37    table: String,
38    data: JsonValue,
39    upsert: bool,
40    on_conflict: Option<String>,
41    returning: Option<String>,
42}
43
44/// Update builder for UPDATE operations
45#[derive(Debug, Clone)]
46pub struct UpdateBuilder {
47    database: Database,
48    table: String,
49    data: JsonValue,
50    filters: Vec<Filter>,
51    returning: Option<String>,
52}
53
54/// Delete builder for DELETE operations
55#[derive(Debug, Clone)]
56pub struct DeleteBuilder {
57    database: Database,
58    table: String,
59    filters: Vec<Filter>,
60    returning: Option<String>,
61}
62
63/// Database filter for WHERE clauses
64#[derive(Debug, Clone)]
65struct Filter {
66    column: String,
67    operator: FilterOperator,
68    value: String,
69}
70
71/// Order by clause
72#[derive(Debug, Clone)]
73struct OrderBy {
74    column: String,
75    direction: OrderDirection,
76    #[allow(dead_code)]
77    nulls_first: Option<bool>,
78}
79
80/// Database response wrapper
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DatabaseResponse<T> {
83    pub data: T,
84    pub count: Option<u64>,
85}
86
87impl Database {
88    /// Create a new Database instance
89    pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
90        debug!("Initializing Database module");
91
92        Ok(Self {
93            http_client,
94            config,
95        })
96    }
97
98    /// Start a query from a table
99    pub fn from(&self, table: &str) -> QueryBuilder {
100        QueryBuilder::new(self.clone(), table.to_string())
101    }
102
103    /// Insert data into a table
104    pub fn insert(&self, table: &str) -> InsertBuilder {
105        InsertBuilder::new(self.clone(), table.to_string())
106    }
107
108    /// Update data in a table
109    pub fn update(&self, table: &str) -> UpdateBuilder {
110        UpdateBuilder::new(self.clone(), table.to_string())
111    }
112
113    /// Delete data from a table
114    pub fn delete(&self, table: &str) -> DeleteBuilder {
115        DeleteBuilder::new(self.clone(), table.to_string())
116    }
117
118    /// Execute a custom SQL query via RPC
119    pub async fn rpc(&self, function_name: &str, params: Option<JsonValue>) -> Result<JsonValue> {
120        debug!("Executing RPC function: {}", function_name);
121
122        let url = format!("{}/rest/v1/rpc/{}", self.config.url, function_name);
123
124        let mut request = self.http_client.post(&url);
125
126        if let Some(params) = params {
127            request = request.json(&params);
128        }
129
130        let response = request.send().await?;
131
132        if !response.status().is_success() {
133            let status = response.status();
134            let error_msg = match response.text().await {
135                Ok(text) => text,
136                Err(_) => format!("RPC failed with status: {}", status),
137            };
138            return Err(Error::database(error_msg));
139        }
140
141        let result: JsonValue = response.json().await?;
142        info!("RPC function {} executed successfully", function_name);
143
144        Ok(result)
145    }
146
147    /// Get the base REST URL
148    fn rest_url(&self) -> String {
149        format!("{}/rest/v1", self.config.url)
150    }
151
152    /// Build query parameters from filters
153    fn build_query_params(&self, filters: &[Filter]) -> HashMap<String, String> {
154        let mut params = HashMap::new();
155
156        for filter in filters {
157            let key = match filter.operator {
158                FilterOperator::Equal => filter.column.clone(),
159                FilterOperator::NotEqual => format!("{}:neq", filter.column),
160                FilterOperator::GreaterThan => format!("{}:gt", filter.column),
161                FilterOperator::GreaterThanOrEqual => format!("{}:gte", filter.column),
162                FilterOperator::LessThan => format!("{}:lt", filter.column),
163                FilterOperator::LessThanOrEqual => format!("{}:lte", filter.column),
164                FilterOperator::Like => format!("{}:like", filter.column),
165                FilterOperator::ILike => format!("{}:ilike", filter.column),
166                FilterOperator::Is => format!("{}:is", filter.column),
167                FilterOperator::In => format!("{}:in", filter.column),
168                FilterOperator::Contains => format!("{}:cs", filter.column),
169                FilterOperator::ContainedBy => format!("{}:cd", filter.column),
170                FilterOperator::StrictlyLeft => format!("{}:sl", filter.column),
171                FilterOperator::StrictlyRight => format!("{}:sr", filter.column),
172                FilterOperator::NotExtendToRight => format!("{}:nxr", filter.column),
173                FilterOperator::NotExtendToLeft => format!("{}:nxl", filter.column),
174                FilterOperator::Adjacent => format!("{}:adj", filter.column),
175            };
176
177            params.insert(key, filter.value.clone());
178        }
179
180        params
181    }
182}
183
184impl QueryBuilder {
185    fn new(database: Database, table: String) -> Self {
186        Self {
187            database,
188            table,
189            columns: None,
190            filters: Vec::new(),
191            order_by: Vec::new(),
192            limit: None,
193            offset: None,
194            single: false,
195        }
196    }
197
198    /// Select specific columns
199    pub fn select(mut self, columns: &str) -> Self {
200        self.columns = Some(columns.to_string());
201        self
202    }
203
204    /// Add an equality filter
205    pub fn eq(mut self, column: &str, value: &str) -> Self {
206        self.filters.push(Filter {
207            column: column.to_string(),
208            operator: FilterOperator::Equal,
209            value: value.to_string(),
210        });
211        self
212    }
213
214    /// Add a not equal filter
215    pub fn neq(mut self, column: &str, value: &str) -> Self {
216        self.filters.push(Filter {
217            column: column.to_string(),
218            operator: FilterOperator::NotEqual,
219            value: value.to_string(),
220        });
221        self
222    }
223
224    /// Add a greater than filter
225    pub fn gt(mut self, column: &str, value: &str) -> Self {
226        self.filters.push(Filter {
227            column: column.to_string(),
228            operator: FilterOperator::GreaterThan,
229            value: value.to_string(),
230        });
231        self
232    }
233
234    /// Add a greater than or equal filter
235    pub fn gte(mut self, column: &str, value: &str) -> Self {
236        self.filters.push(Filter {
237            column: column.to_string(),
238            operator: FilterOperator::GreaterThanOrEqual,
239            value: value.to_string(),
240        });
241        self
242    }
243
244    /// Add a less than filter
245    pub fn lt(mut self, column: &str, value: &str) -> Self {
246        self.filters.push(Filter {
247            column: column.to_string(),
248            operator: FilterOperator::LessThan,
249            value: value.to_string(),
250        });
251        self
252    }
253
254    /// Add a less than or equal filter
255    pub fn lte(mut self, column: &str, value: &str) -> Self {
256        self.filters.push(Filter {
257            column: column.to_string(),
258            operator: FilterOperator::LessThanOrEqual,
259            value: value.to_string(),
260        });
261        self
262    }
263
264    /// Add a LIKE filter
265    pub fn like(mut self, column: &str, pattern: &str) -> Self {
266        self.filters.push(Filter {
267            column: column.to_string(),
268            operator: FilterOperator::Like,
269            value: pattern.to_string(),
270        });
271        self
272    }
273
274    /// Add an ILIKE filter (case-insensitive)
275    pub fn ilike(mut self, column: &str, pattern: &str) -> Self {
276        self.filters.push(Filter {
277            column: column.to_string(),
278            operator: FilterOperator::ILike,
279            value: pattern.to_string(),
280        });
281        self
282    }
283
284    /// Add an IS filter (for null checks)
285    pub fn is(mut self, column: &str, value: &str) -> Self {
286        self.filters.push(Filter {
287            column: column.to_string(),
288            operator: FilterOperator::Is,
289            value: value.to_string(),
290        });
291        self
292    }
293
294    /// Add an IN filter
295    pub fn r#in(mut self, column: &str, values: &[&str]) -> Self {
296        let value = format!("({})", values.join(","));
297        self.filters.push(Filter {
298            column: column.to_string(),
299            operator: FilterOperator::In,
300            value,
301        });
302        self
303    }
304
305    /// Add ordering
306    pub fn order(mut self, column: &str, direction: OrderDirection) -> Self {
307        self.order_by.push(OrderBy {
308            column: column.to_string(),
309            direction,
310            nulls_first: None,
311        });
312        self
313    }
314
315    /// Set limit
316    pub fn limit(mut self, limit: u32) -> Self {
317        self.limit = Some(limit);
318        self
319    }
320
321    /// Set offset
322    pub fn offset(mut self, offset: u32) -> Self {
323        self.offset = Some(offset);
324        self
325    }
326
327    /// Return single row
328    pub fn single(mut self) -> Self {
329        self.single = true;
330        self
331    }
332
333    /// Execute the query
334    pub async fn execute<T>(&self) -> Result<Vec<T>>
335    where
336        T: for<'de> Deserialize<'de>,
337    {
338        debug!("Executing SELECT query on table: {}", self.table);
339
340        let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
341
342        // Add query parameters
343        let mut query_params = self.database.build_query_params(&self.filters);
344
345        if let Some(ref columns) = self.columns {
346            query_params.insert("select".to_string(), columns.clone());
347        }
348
349        if !self.order_by.is_empty() {
350            let order_clauses: Vec<String> = self
351                .order_by
352                .iter()
353                .map(|order| {
354                    let direction = match order.direction {
355                        OrderDirection::Ascending => "asc",
356                        OrderDirection::Descending => "desc",
357                    };
358                    format!("{}.{}", order.column, direction)
359                })
360                .collect();
361            query_params.insert("order".to_string(), order_clauses.join(","));
362        }
363
364        if let Some(limit) = self.limit {
365            query_params.insert("limit".to_string(), limit.to_string());
366        }
367
368        if let Some(offset) = self.offset {
369            query_params.insert("offset".to_string(), offset.to_string());
370        }
371
372        // Set URL query parameters
373        for (key, value) in query_params {
374            url.query_pairs_mut().append_pair(&key, &value);
375        }
376
377        let mut request = self.database.http_client.get(url.as_str());
378
379        if self.single {
380            request = request.header("Accept", "application/vnd.pgrst.object+json");
381        }
382
383        let response = request.send().await?;
384
385        if !response.status().is_success() {
386            let status = response.status();
387            let error_msg = match response.text().await {
388                Ok(text) => text,
389                Err(_) => format!("Query failed with status: {}", status),
390            };
391            return Err(Error::database(error_msg));
392        }
393
394        let result = if self.single {
395            let single_item: T = response.json().await?;
396            vec![single_item]
397        } else {
398            response.json().await?
399        };
400
401        info!(
402            "SELECT query executed successfully on table: {}",
403            self.table
404        );
405        Ok(result)
406    }
407
408    /// Execute the query and return a single row
409    pub async fn single_execute<T>(&self) -> Result<Option<T>>
410    where
411        T: for<'de> Deserialize<'de>,
412    {
413        let mut builder = self.clone();
414        builder.single = true;
415
416        let results = builder.execute().await?;
417        Ok(results.into_iter().next())
418    }
419}
420
421impl InsertBuilder {
422    fn new(database: Database, table: String) -> Self {
423        Self {
424            database,
425            table,
426            data: JsonValue::Null,
427            upsert: false,
428            on_conflict: None,
429            returning: None,
430        }
431    }
432
433    /// Set the data to insert
434    pub fn values<T: Serialize>(mut self, data: T) -> Result<Self> {
435        self.data = serde_json::to_value(data)?;
436        Ok(self)
437    }
438
439    /// Enable upsert mode
440    pub fn upsert(mut self) -> Self {
441        self.upsert = true;
442        self
443    }
444
445    /// Set conflict resolution
446    pub fn on_conflict(mut self, columns: &str) -> Self {
447        self.on_conflict = Some(columns.to_string());
448        self
449    }
450
451    /// Set columns to return
452    pub fn returning(mut self, columns: &str) -> Self {
453        self.returning = Some(columns.to_string());
454        self
455    }
456
457    /// Execute the insert
458    pub async fn execute<T>(&self) -> Result<Vec<T>>
459    where
460        T: for<'de> Deserialize<'de>,
461    {
462        debug!("Executing INSERT query on table: {}", self.table);
463
464        let url = format!("{}/{}", self.database.rest_url(), self.table);
465        let mut request = self.database.http_client.post(&url).json(&self.data);
466
467        if let Some(ref _returning) = self.returning {
468            request = request.header("Prefer", "return=representation".to_string());
469        }
470
471        if self.upsert {
472            request = request.header("Prefer", "resolution=merge-duplicates");
473        }
474
475        let response = request.send().await?;
476
477        if !response.status().is_success() {
478            let status = response.status();
479            let error_msg = match response.text().await {
480                Ok(text) => text,
481                Err(_) => format!("Insert failed with status: {}", status),
482            };
483            return Err(Error::database(error_msg));
484        }
485
486        let result: Vec<T> = response.json().await?;
487        info!(
488            "INSERT query executed successfully on table: {}",
489            self.table
490        );
491
492        Ok(result)
493    }
494}
495
496impl UpdateBuilder {
497    fn new(database: Database, table: String) -> Self {
498        Self {
499            database,
500            table,
501            data: JsonValue::Null,
502            filters: Vec::new(),
503            returning: None,
504        }
505    }
506
507    /// Set the data to update
508    pub fn set<T: Serialize>(mut self, data: T) -> Result<Self> {
509        self.data = serde_json::to_value(data)?;
510        Ok(self)
511    }
512
513    /// Add an equality filter
514    pub fn eq(mut self, column: &str, value: &str) -> Self {
515        self.filters.push(Filter {
516            column: column.to_string(),
517            operator: FilterOperator::Equal,
518            value: value.to_string(),
519        });
520        self
521    }
522
523    /// Set columns to return
524    pub fn returning(mut self, columns: &str) -> Self {
525        self.returning = Some(columns.to_string());
526        self
527    }
528
529    /// Execute the update
530    pub async fn execute<T>(&self) -> Result<Vec<T>>
531    where
532        T: for<'de> Deserialize<'de>,
533    {
534        debug!("Executing UPDATE query on table: {}", self.table);
535
536        let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
537
538        // Add filters as query parameters
539        let query_params = self.database.build_query_params(&self.filters);
540        for (key, value) in query_params {
541            url.query_pairs_mut().append_pair(&key, &value);
542        }
543
544        let mut request = self
545            .database
546            .http_client
547            .patch(url.as_str())
548            .json(&self.data);
549
550        if let Some(ref _returning) = self.returning {
551            request = request.header("Prefer", "return=representation");
552        }
553
554        let response = request.send().await?;
555
556        if !response.status().is_success() {
557            let status = response.status();
558            let error_msg = match response.text().await {
559                Ok(text) => text,
560                Err(_) => format!("Update failed with status: {}", status),
561            };
562            return Err(Error::database(error_msg));
563        }
564
565        let result: Vec<T> = response.json().await?;
566        info!(
567            "UPDATE query executed successfully on table: {}",
568            self.table
569        );
570
571        Ok(result)
572    }
573}
574
575impl DeleteBuilder {
576    fn new(database: Database, table: String) -> Self {
577        Self {
578            database,
579            table,
580            filters: Vec::new(),
581            returning: None,
582        }
583    }
584
585    /// Add an equality filter
586    pub fn eq(mut self, column: &str, value: &str) -> Self {
587        self.filters.push(Filter {
588            column: column.to_string(),
589            operator: FilterOperator::Equal,
590            value: value.to_string(),
591        });
592        self
593    }
594
595    /// Set columns to return
596    pub fn returning(mut self, columns: &str) -> Self {
597        self.returning = Some(columns.to_string());
598        self
599    }
600
601    /// Execute the delete
602    pub async fn execute<T>(&self) -> Result<Vec<T>>
603    where
604        T: for<'de> Deserialize<'de>,
605    {
606        debug!("Executing DELETE query on table: {}", self.table);
607
608        let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
609
610        // Add filters as query parameters
611        let query_params = self.database.build_query_params(&self.filters);
612        for (key, value) in query_params {
613            url.query_pairs_mut().append_pair(&key, &value);
614        }
615
616        let mut request = self.database.http_client.delete(url.as_str());
617
618        if let Some(ref _returning) = self.returning {
619            request = request.header("Prefer", "return=representation");
620        }
621
622        let response = request.send().await?;
623
624        if !response.status().is_success() {
625            let status = response.status();
626            let error_msg = match response.text().await {
627                Ok(text) => text,
628                Err(_) => format!("Delete failed with status: {}", status),
629            };
630            return Err(Error::database(error_msg));
631        }
632
633        let result: Vec<T> = response.json().await?;
634        info!(
635            "DELETE query executed successfully on table: {}",
636            self.table
637        );
638
639        Ok(result)
640    }
641}