1use crate::{
4 error::{Error, Result},
5 types::{FilterOperator, JsonValue, OrderDirection, SupabaseConfig},
6};
7use reqwest::Client as HttpClient;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, sync::Arc};
10use tracing::{debug, info};
11use url::Url;
12
13#[derive(Debug, Clone)]
15pub struct Database {
16 http_client: Arc<HttpClient>,
17 config: Arc<SupabaseConfig>,
18}
19
20#[derive(Debug, Clone)]
22pub struct QueryBuilder {
23 database: Database,
24 table: String,
25 columns: Option<String>,
26 filters: Vec<Filter>,
27 order_by: Vec<OrderBy>,
28 limit: Option<u32>,
29 offset: Option<u32>,
30 single: bool,
31}
32
33#[derive(Debug, Clone)]
35pub struct InsertBuilder {
36 database: Database,
37 table: String,
38 data: JsonValue,
39 upsert: bool,
40 on_conflict: Option<String>,
41 returning: Option<String>,
42}
43
44#[derive(Debug, Clone)]
46pub struct UpdateBuilder {
47 database: Database,
48 table: String,
49 data: JsonValue,
50 filters: Vec<Filter>,
51 returning: Option<String>,
52}
53
54#[derive(Debug, Clone)]
56pub struct DeleteBuilder {
57 database: Database,
58 table: String,
59 filters: Vec<Filter>,
60 returning: Option<String>,
61}
62
63#[derive(Debug, Clone)]
65struct Filter {
66 column: String,
67 operator: FilterOperator,
68 value: String,
69}
70
71#[derive(Debug, Clone)]
73struct OrderBy {
74 column: String,
75 direction: OrderDirection,
76 #[allow(dead_code)]
77 nulls_first: Option<bool>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DatabaseResponse<T> {
83 pub data: T,
84 pub count: Option<u64>,
85}
86
87impl Database {
88 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
90 debug!("Initializing Database module");
91
92 Ok(Self {
93 http_client,
94 config,
95 })
96 }
97
98 pub fn from(&self, table: &str) -> QueryBuilder {
100 QueryBuilder::new(self.clone(), table.to_string())
101 }
102
103 pub fn insert(&self, table: &str) -> InsertBuilder {
105 InsertBuilder::new(self.clone(), table.to_string())
106 }
107
108 pub fn update(&self, table: &str) -> UpdateBuilder {
110 UpdateBuilder::new(self.clone(), table.to_string())
111 }
112
113 pub fn delete(&self, table: &str) -> DeleteBuilder {
115 DeleteBuilder::new(self.clone(), table.to_string())
116 }
117
118 pub async fn rpc(&self, function_name: &str, params: Option<JsonValue>) -> Result<JsonValue> {
120 debug!("Executing RPC function: {}", function_name);
121
122 let url = format!("{}/rest/v1/rpc/{}", self.config.url, function_name);
123
124 let mut request = self.http_client.post(&url);
125
126 if let Some(params) = params {
127 request = request.json(¶ms);
128 }
129
130 let response = request.send().await?;
131
132 if !response.status().is_success() {
133 let status = response.status();
134 let error_msg = match response.text().await {
135 Ok(text) => text,
136 Err(_) => format!("RPC failed with status: {}", status),
137 };
138 return Err(Error::database(error_msg));
139 }
140
141 let result: JsonValue = response.json().await?;
142 info!("RPC function {} executed successfully", function_name);
143
144 Ok(result)
145 }
146
147 fn rest_url(&self) -> String {
149 format!("{}/rest/v1", self.config.url)
150 }
151
152 fn build_query_params(&self, filters: &[Filter]) -> HashMap<String, String> {
154 let mut params = HashMap::new();
155
156 for filter in filters {
157 let (key, value) = match filter.operator {
158 FilterOperator::Equal => (filter.column.clone(), format!("eq.{}", filter.value)),
159 FilterOperator::NotEqual => {
160 (filter.column.clone(), format!("neq.{}", filter.value))
161 }
162 FilterOperator::GreaterThan => {
163 (filter.column.clone(), format!("gt.{}", filter.value))
164 }
165 FilterOperator::GreaterThanOrEqual => {
166 (filter.column.clone(), format!("gte.{}", filter.value))
167 }
168 FilterOperator::LessThan => (filter.column.clone(), format!("lt.{}", filter.value)),
169 FilterOperator::LessThanOrEqual => {
170 (filter.column.clone(), format!("lte.{}", filter.value))
171 }
172 FilterOperator::Like => (filter.column.clone(), format!("like.{}", filter.value)),
173 FilterOperator::ILike => (filter.column.clone(), format!("ilike.{}", filter.value)),
174 FilterOperator::Is => (filter.column.clone(), format!("is.{}", filter.value)),
175 FilterOperator::In => (filter.column.clone(), format!("in.{}", filter.value)),
176 FilterOperator::Contains => (filter.column.clone(), format!("cs.{}", filter.value)),
177 FilterOperator::ContainedBy => {
178 (filter.column.clone(), format!("cd.{}", filter.value))
179 }
180 FilterOperator::StrictlyLeft => {
181 (filter.column.clone(), format!("sl.{}", filter.value))
182 }
183 FilterOperator::StrictlyRight => {
184 (filter.column.clone(), format!("sr.{}", filter.value))
185 }
186 FilterOperator::NotExtendToRight => {
187 (filter.column.clone(), format!("nxr.{}", filter.value))
188 }
189 FilterOperator::NotExtendToLeft => {
190 (filter.column.clone(), format!("nxl.{}", filter.value))
191 }
192 FilterOperator::Adjacent => {
193 (filter.column.clone(), format!("adj.{}", filter.value))
194 }
195 };
196
197 params.insert(key, value);
198 }
199
200 params
201 }
202}
203
204impl QueryBuilder {
205 fn new(database: Database, table: String) -> Self {
206 Self {
207 database,
208 table,
209 columns: None,
210 filters: Vec::new(),
211 order_by: Vec::new(),
212 limit: None,
213 offset: None,
214 single: false,
215 }
216 }
217
218 pub fn select(mut self, columns: &str) -> Self {
220 self.columns = Some(columns.to_string());
221 self
222 }
223
224 pub fn eq(mut self, column: &str, value: &str) -> Self {
226 self.filters.push(Filter {
227 column: column.to_string(),
228 operator: FilterOperator::Equal,
229 value: value.to_string(),
230 });
231 self
232 }
233
234 pub fn neq(mut self, column: &str, value: &str) -> Self {
236 self.filters.push(Filter {
237 column: column.to_string(),
238 operator: FilterOperator::NotEqual,
239 value: value.to_string(),
240 });
241 self
242 }
243
244 pub fn gt(mut self, column: &str, value: &str) -> Self {
246 self.filters.push(Filter {
247 column: column.to_string(),
248 operator: FilterOperator::GreaterThan,
249 value: value.to_string(),
250 });
251 self
252 }
253
254 pub fn gte(mut self, column: &str, value: &str) -> Self {
256 self.filters.push(Filter {
257 column: column.to_string(),
258 operator: FilterOperator::GreaterThanOrEqual,
259 value: value.to_string(),
260 });
261 self
262 }
263
264 pub fn lt(mut self, column: &str, value: &str) -> Self {
266 self.filters.push(Filter {
267 column: column.to_string(),
268 operator: FilterOperator::LessThan,
269 value: value.to_string(),
270 });
271 self
272 }
273
274 pub fn lte(mut self, column: &str, value: &str) -> Self {
276 self.filters.push(Filter {
277 column: column.to_string(),
278 operator: FilterOperator::LessThanOrEqual,
279 value: value.to_string(),
280 });
281 self
282 }
283
284 pub fn like(mut self, column: &str, pattern: &str) -> Self {
286 self.filters.push(Filter {
287 column: column.to_string(),
288 operator: FilterOperator::Like,
289 value: pattern.to_string(),
290 });
291 self
292 }
293
294 pub fn ilike(mut self, column: &str, pattern: &str) -> Self {
296 self.filters.push(Filter {
297 column: column.to_string(),
298 operator: FilterOperator::ILike,
299 value: pattern.to_string(),
300 });
301 self
302 }
303
304 pub fn is(mut self, column: &str, value: &str) -> Self {
306 self.filters.push(Filter {
307 column: column.to_string(),
308 operator: FilterOperator::Is,
309 value: value.to_string(),
310 });
311 self
312 }
313
314 pub fn r#in(mut self, column: &str, values: &[&str]) -> Self {
316 let value = format!("({})", values.join(","));
317 self.filters.push(Filter {
318 column: column.to_string(),
319 operator: FilterOperator::In,
320 value,
321 });
322 self
323 }
324
325 pub fn order(mut self, column: &str, direction: OrderDirection) -> Self {
327 self.order_by.push(OrderBy {
328 column: column.to_string(),
329 direction,
330 nulls_first: None,
331 });
332 self
333 }
334
335 pub fn limit(mut self, limit: u32) -> Self {
337 self.limit = Some(limit);
338 self
339 }
340
341 pub fn offset(mut self, offset: u32) -> Self {
343 self.offset = Some(offset);
344 self
345 }
346
347 pub fn single(mut self) -> Self {
349 self.single = true;
350 self
351 }
352
353 pub async fn execute<T>(&self) -> Result<Vec<T>>
355 where
356 T: for<'de> Deserialize<'de>,
357 {
358 debug!("Executing SELECT query on table: {}", self.table);
359
360 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
361
362 let mut query_params = self.database.build_query_params(&self.filters);
364
365 if let Some(ref columns) = self.columns {
366 query_params.insert("select".to_string(), columns.clone());
367 }
368
369 if !self.order_by.is_empty() {
370 let order_clauses: Vec<String> = self
371 .order_by
372 .iter()
373 .map(|order| {
374 let direction = match order.direction {
375 OrderDirection::Ascending => "asc",
376 OrderDirection::Descending => "desc",
377 };
378 format!("{}.{}", order.column, direction)
379 })
380 .collect();
381 query_params.insert("order".to_string(), order_clauses.join(","));
382 }
383
384 if let Some(limit) = self.limit {
385 query_params.insert("limit".to_string(), limit.to_string());
386 }
387
388 if let Some(offset) = self.offset {
389 query_params.insert("offset".to_string(), offset.to_string());
390 }
391
392 for (key, value) in query_params {
394 url.query_pairs_mut().append_pair(&key, &value);
395 }
396
397 debug!("Generated query URL: {}", url.as_str());
398 let mut request = self.database.http_client.get(url.as_str());
399
400 if self.single {
401 request = request.header("Accept", "application/vnd.pgrst.object+json");
402 }
403
404 let response = request.send().await?;
405
406 if !response.status().is_success() {
407 let status = response.status();
408 let error_msg = match response.text().await {
409 Ok(text) => text,
410 Err(_) => format!("Query failed with status: {}", status),
411 };
412 return Err(Error::database(error_msg));
413 }
414
415 let result = if self.single {
416 let single_item: T = response.json().await?;
417 vec![single_item]
418 } else {
419 response.json().await?
420 };
421
422 info!(
423 "SELECT query executed successfully on table: {}",
424 self.table
425 );
426 Ok(result)
427 }
428
429 pub async fn single_execute<T>(&self) -> Result<Option<T>>
431 where
432 T: for<'de> Deserialize<'de>,
433 {
434 let mut builder = self.clone();
435 builder.single = true;
436
437 let results = builder.execute().await?;
438 Ok(results.into_iter().next())
439 }
440}
441
442impl InsertBuilder {
443 fn new(database: Database, table: String) -> Self {
444 Self {
445 database,
446 table,
447 data: JsonValue::Null,
448 upsert: false,
449 on_conflict: None,
450 returning: None,
451 }
452 }
453
454 pub fn values<T: Serialize>(mut self, data: T) -> Result<Self> {
456 self.data = serde_json::to_value(data)?;
457 Ok(self)
458 }
459
460 pub fn upsert(mut self) -> Self {
462 self.upsert = true;
463 self
464 }
465
466 pub fn on_conflict(mut self, columns: &str) -> Self {
468 self.on_conflict = Some(columns.to_string());
469 self
470 }
471
472 pub fn returning(mut self, columns: &str) -> Self {
474 self.returning = Some(columns.to_string());
475 self
476 }
477
478 pub async fn execute<T>(&self) -> Result<Vec<T>>
480 where
481 T: for<'de> Deserialize<'de>,
482 {
483 debug!("Executing INSERT query on table: {}", self.table);
484
485 let url = format!("{}/{}", self.database.rest_url(), self.table);
486 let mut request = self.database.http_client.post(&url).json(&self.data);
487
488 if let Some(ref _returning) = self.returning {
489 request = request.header("Prefer", "return=representation".to_string());
490 }
491
492 if self.upsert {
493 request = request.header("Prefer", "resolution=merge-duplicates");
494 }
495
496 let response = request.send().await?;
497
498 if !response.status().is_success() {
499 let status = response.status();
500 let error_msg = match response.text().await {
501 Ok(text) => text,
502 Err(_) => format!("Insert failed with status: {}", status),
503 };
504 return Err(Error::database(error_msg));
505 }
506
507 let result: Vec<T> = response.json().await?;
508 info!(
509 "INSERT query executed successfully on table: {}",
510 self.table
511 );
512
513 Ok(result)
514 }
515}
516
517impl UpdateBuilder {
518 fn new(database: Database, table: String) -> Self {
519 Self {
520 database,
521 table,
522 data: JsonValue::Null,
523 filters: Vec::new(),
524 returning: None,
525 }
526 }
527
528 pub fn set<T: Serialize>(mut self, data: T) -> Result<Self> {
530 self.data = serde_json::to_value(data)?;
531 Ok(self)
532 }
533
534 pub fn eq(mut self, column: &str, value: &str) -> Self {
536 self.filters.push(Filter {
537 column: column.to_string(),
538 operator: FilterOperator::Equal,
539 value: value.to_string(),
540 });
541 self
542 }
543
544 pub fn returning(mut self, columns: &str) -> Self {
546 self.returning = Some(columns.to_string());
547 self
548 }
549
550 pub async fn execute<T>(&self) -> Result<Vec<T>>
552 where
553 T: for<'de> Deserialize<'de>,
554 {
555 debug!("Executing UPDATE query on table: {}", self.table);
556
557 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
558
559 let query_params = self.database.build_query_params(&self.filters);
561 for (key, value) in query_params {
562 url.query_pairs_mut().append_pair(&key, &value);
563 }
564
565 let mut request = self
566 .database
567 .http_client
568 .patch(url.as_str())
569 .json(&self.data);
570
571 if let Some(ref _returning) = self.returning {
572 request = request.header("Prefer", "return=representation");
573 }
574
575 let response = request.send().await?;
576
577 if !response.status().is_success() {
578 let status = response.status();
579 let error_msg = match response.text().await {
580 Ok(text) => text,
581 Err(_) => format!("Update failed with status: {}", status),
582 };
583 return Err(Error::database(error_msg));
584 }
585
586 let result: Vec<T> = response.json().await?;
587 info!(
588 "UPDATE query executed successfully on table: {}",
589 self.table
590 );
591
592 Ok(result)
593 }
594}
595
596impl DeleteBuilder {
597 fn new(database: Database, table: String) -> Self {
598 Self {
599 database,
600 table,
601 filters: Vec::new(),
602 returning: None,
603 }
604 }
605
606 pub fn eq(mut self, column: &str, value: &str) -> Self {
608 self.filters.push(Filter {
609 column: column.to_string(),
610 operator: FilterOperator::Equal,
611 value: value.to_string(),
612 });
613 self
614 }
615
616 pub fn returning(mut self, columns: &str) -> Self {
618 self.returning = Some(columns.to_string());
619 self
620 }
621
622 pub async fn execute<T>(&self) -> Result<Vec<T>>
624 where
625 T: for<'de> Deserialize<'de>,
626 {
627 debug!("Executing DELETE query on table: {}", self.table);
628
629 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
630
631 let query_params = self.database.build_query_params(&self.filters);
633 for (key, value) in query_params {
634 url.query_pairs_mut().append_pair(&key, &value);
635 }
636
637 let mut request = self.database.http_client.delete(url.as_str());
638
639 if let Some(ref _returning) = self.returning {
640 request = request.header("Prefer", "return=representation");
641 }
642
643 let response = request.send().await?;
644
645 if !response.status().is_success() {
646 let status = response.status();
647 let error_msg = match response.text().await {
648 Ok(text) => text,
649 Err(_) => format!("Delete failed with status: {}", status),
650 };
651 return Err(Error::database(error_msg));
652 }
653
654 let result: Vec<T> = response.json().await?;
655 info!(
656 "DELETE query executed successfully on table: {}",
657 self.table
658 );
659
660 Ok(result)
661 }
662}