1use crate::{
4 error::{Error, Result},
5 types::{FilterOperator, JsonValue, OrderDirection, SupabaseConfig},
6};
7use reqwest::Client as HttpClient;
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use std::{collections::HashMap, sync::Arc};
11use tracing::{debug, info};
12use url::Url;
13
14#[derive(Debug, Clone)]
16pub struct Database {
17 http_client: Arc<HttpClient>,
18 config: Arc<SupabaseConfig>,
19}
20
21#[derive(Debug, Clone)]
23pub struct QueryBuilder {
24 database: Database,
25 table: String,
26 columns: Option<String>,
27 filters: Vec<Filter>,
28 order_by: Vec<OrderBy>,
29 limit: Option<u32>,
30 offset: Option<u32>,
31 single: bool,
32 joins: Vec<Join>,
33}
34
35#[derive(Debug, Clone)]
37pub struct Join {
38 pub join_type: JoinType,
40 pub foreign_table: String,
42 pub foreign_columns: String,
44 pub alias: Option<String>,
46}
47
48#[derive(Debug, Clone)]
50pub enum JoinType {
51 Inner,
53 Left,
55}
56
57#[derive(Debug, Clone)]
59pub struct TransactionBuilder {
60 database: Database,
61 operations: Vec<JsonValue>,
62}
63
64#[derive(Debug, Clone, Serialize)]
66#[serde(rename_all = "lowercase")]
67pub enum TransactionOperation {
68 Insert,
69 Update,
70 Delete,
71 Select,
72 Rpc,
73}
74
75#[derive(Debug, Clone)]
77pub struct InsertBuilder {
78 database: Database,
79 table: String,
80 data: JsonValue,
81 upsert: bool,
82 on_conflict: Option<String>,
83 returning: Option<String>,
84}
85
86#[derive(Debug, Clone)]
88pub struct UpdateBuilder {
89 database: Database,
90 table: String,
91 data: JsonValue,
92 filters: Vec<Filter>,
93 returning: Option<String>,
94}
95
96#[derive(Debug, Clone)]
98pub struct DeleteBuilder {
99 database: Database,
100 table: String,
101 filters: Vec<Filter>,
102 returning: Option<String>,
103}
104
105#[derive(Debug, Clone)]
107pub enum Filter {
108 Simple {
110 column: String,
111 operator: FilterOperator,
112 value: String,
113 },
114 And(Vec<Filter>),
116 Or(Vec<Filter>),
118 Not(Box<Filter>),
120}
121
122#[derive(Debug, Clone)]
124struct OrderBy {
125 column: String,
126 direction: OrderDirection,
127 #[allow(dead_code)]
128 nulls_first: Option<bool>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct DatabaseResponse<T> {
134 pub data: T,
135 pub count: Option<u64>,
136}
137
138impl Database {
139 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
141 debug!("Initializing Database module");
142
143 Ok(Self {
144 http_client,
145 config,
146 })
147 }
148
149 pub fn from(&self, table: &str) -> QueryBuilder {
151 QueryBuilder::new(self.clone(), table.to_string())
152 }
153
154 pub fn insert(&self, table: &str) -> InsertBuilder {
156 InsertBuilder::new(self.clone(), table.to_string())
157 }
158
159 pub fn upsert(&self, table: &str) -> InsertBuilder {
185 InsertBuilder::new(self.clone(), table.to_string()).upsert()
186 }
187
188 pub async fn bulk_insert<T>(&self, table: &str, data: Vec<JsonValue>) -> Result<Vec<T>>
211 where
212 T: for<'de> Deserialize<'de>,
213 {
214 debug!(
215 "Executing BULK INSERT on table: {} with {} records",
216 table,
217 data.len()
218 );
219
220 let url = format!("{}/{}", self.rest_url(), table);
221 let response = self
222 .http_client
223 .post(&url)
224 .json(&data)
225 .header("Prefer", "return=representation")
226 .send()
227 .await?;
228
229 if !response.status().is_success() {
230 let status = response.status();
231 let error_msg = match response.text().await {
232 Ok(text) => text,
233 Err(_) => format!("Bulk insert failed with status: {}", status),
234 };
235 return Err(Error::database(error_msg));
236 }
237
238 let result: Vec<T> = response.json().await?;
239 info!("Bulk insert executed successfully on table: {}", table);
240 Ok(result)
241 }
242
243 pub async fn bulk_upsert<T>(&self, table: &str, data: Vec<JsonValue>) -> Result<Vec<T>>
265 where
266 T: for<'de> Deserialize<'de>,
267 {
268 debug!(
269 "Executing BULK UPSERT on table: {} with {} records",
270 table,
271 data.len()
272 );
273
274 let url = format!("{}/{}", self.rest_url(), table);
275 let response = self
276 .http_client
277 .post(&url)
278 .json(&data)
279 .header(
280 "Prefer",
281 "return=representation,resolution=merge-duplicates",
282 )
283 .send()
284 .await?;
285
286 if !response.status().is_success() {
287 let status = response.status();
288 let error_msg = match response.text().await {
289 Ok(text) => text,
290 Err(_) => format!("Bulk upsert failed with status: {}", status),
291 };
292 return Err(Error::database(error_msg));
293 }
294
295 let result: Vec<T> = response.json().await?;
296 info!("Bulk upsert executed successfully on table: {}", table);
297 Ok(result)
298 }
299
300 pub async fn raw_sql<T>(&self, sql: &str, params: JsonValue) -> Result<Vec<T>>
319 where
320 T: for<'de> Deserialize<'de>,
321 {
322 debug!("Executing raw SQL: {}", sql);
323
324 let rpc_params = json!({
326 "sql": sql,
327 "params": params
328 });
329
330 let result = self.rpc("execute_sql", Some(rpc_params)).await?;
331
332 match result {
334 JsonValue::Array(arr) => {
335 let mut results = Vec::with_capacity(arr.len());
336 for item in arr {
337 let parsed: T = serde_json::from_value(item)?;
338 results.push(parsed);
339 }
340 Ok(results)
341 }
342 _ => {
343 let parsed: T = serde_json::from_value(result)?;
345 Ok(vec![parsed])
346 }
347 }
348 }
349
350 pub async fn prepared_statement<T>(
369 &self,
370 statement_name: &str,
371 params: JsonValue,
372 ) -> Result<Vec<T>>
373 where
374 T: for<'de> Deserialize<'de>,
375 {
376 debug!("Executing prepared statement: {}", statement_name);
377
378 let result = self.rpc(statement_name, Some(params)).await?;
379
380 match result {
382 JsonValue::Array(arr) => {
383 let mut results = Vec::with_capacity(arr.len());
384 for item in arr {
385 let parsed: T = serde_json::from_value(item)?;
386 results.push(parsed);
387 }
388 Ok(results)
389 }
390 _ => {
391 let parsed: T = serde_json::from_value(result)?;
393 Ok(vec![parsed])
394 }
395 }
396 }
397
398 pub async fn count_query(&self, function_name: &str, params: JsonValue) -> Result<u64> {
417 debug!("Executing count query: {}", function_name);
418
419 let result = self.rpc(function_name, Some(params)).await?;
420
421 match result {
423 JsonValue::Number(num) => num
424 .as_u64()
425 .ok_or_else(|| Error::database("Count query returned invalid number".to_string())),
426 JsonValue::Object(obj) => obj.get("count").and_then(|v| v.as_u64()).ok_or_else(|| {
427 Error::database("Count query result missing 'count' field".to_string())
428 }),
429 _ => Err(Error::database(
430 "Count query returned unexpected result type".to_string(),
431 )),
432 }
433 }
434
435 pub async fn transaction<T>(&self, operations: Vec<JsonValue>) -> Result<Vec<T>>
466 where
467 T: for<'de> Deserialize<'de>,
468 {
469 debug!("Executing transaction with {} operations", operations.len());
470
471 let transaction_params = json!({
472 "operations": operations
473 });
474
475 let result = self
476 .rpc("execute_transaction", Some(transaction_params))
477 .await?;
478
479 match result {
481 JsonValue::Array(arr) => {
482 let mut results = Vec::with_capacity(arr.len());
483 for item in arr {
484 let parsed: T = serde_json::from_value(item)?;
485 results.push(parsed);
486 }
487 Ok(results)
488 }
489 _ => {
490 let parsed: T = serde_json::from_value(result)?;
492 Ok(vec![parsed])
493 }
494 }
495 }
496
497 pub fn begin_transaction(&self) -> TransactionBuilder {
520 TransactionBuilder::new(self.clone())
521 }
522
523 pub fn update(&self, table: &str) -> UpdateBuilder {
525 UpdateBuilder::new(self.clone(), table.to_string())
526 }
527
528 pub fn delete(&self, table: &str) -> DeleteBuilder {
530 DeleteBuilder::new(self.clone(), table.to_string())
531 }
532
533 pub async fn rpc(&self, function_name: &str, params: Option<JsonValue>) -> Result<JsonValue> {
535 debug!("Executing RPC function: {}", function_name);
536
537 let url = format!("{}/rest/v1/rpc/{}", self.config.url, function_name);
538
539 let mut request = self.http_client.post(&url);
540
541 if let Some(params) = params {
542 request = request.json(¶ms);
543 }
544
545 let response = request.send().await?;
546
547 if !response.status().is_success() {
548 let status = response.status();
549 let error_msg = match response.text().await {
550 Ok(text) => text,
551 Err(_) => format!("RPC failed with status: {}", status),
552 };
553 return Err(Error::database(error_msg));
554 }
555
556 let result: JsonValue = response.json().await?;
557 info!("RPC function {} executed successfully", function_name);
558
559 Ok(result)
560 }
561
562 fn rest_url(&self) -> String {
564 format!("{}/rest/v1", self.config.url)
565 }
566
567 fn build_query_params(&self, filters: &[Filter]) -> HashMap<String, String> {
569 let mut params = HashMap::new();
570
571 for filter in filters {
572 self.build_filter_params(filter, &mut params);
573 }
574
575 params
576 }
577
578 fn build_filter_params(&self, filter: &Filter, params: &mut HashMap<String, String>) {
580 match filter {
581 Filter::Simple {
582 column,
583 operator,
584 value,
585 } => {
586 let filter_value = match operator {
587 FilterOperator::Equal => format!("eq.{}", value),
588 FilterOperator::NotEqual => format!("neq.{}", value),
589 FilterOperator::GreaterThan => format!("gt.{}", value),
590 FilterOperator::GreaterThanOrEqual => format!("gte.{}", value),
591 FilterOperator::LessThan => format!("lt.{}", value),
592 FilterOperator::LessThanOrEqual => format!("lte.{}", value),
593 FilterOperator::Like => format!("like.{}", value),
594 FilterOperator::ILike => format!("ilike.{}", value),
595 FilterOperator::Is => format!("is.{}", value),
596 FilterOperator::In => format!("in.{}", value),
597 FilterOperator::Contains => format!("cs.{}", value),
598 FilterOperator::ContainedBy => format!("cd.{}", value),
599 FilterOperator::StrictlyLeft => format!("sl.{}", value),
600 FilterOperator::StrictlyRight => format!("sr.{}", value),
601 FilterOperator::NotExtendToRight => format!("nxr.{}", value),
602 FilterOperator::NotExtendToLeft => format!("nxl.{}", value),
603 FilterOperator::Adjacent => format!("adj.{}", value),
604 };
605
606 params.insert(column.clone(), filter_value);
607 }
608 Filter::And(filters) => {
609 for filter in filters {
611 self.build_filter_params(filter, params);
612 }
613 }
614 Filter::Or(filters) => {
615 let or_conditions: Vec<String> = filters
617 .iter()
618 .map(|f| self.build_filter_condition(f))
619 .collect();
620
621 if !or_conditions.is_empty() {
622 let or_value = format!("({})", or_conditions.join(","));
623 params.insert("or".to_string(), or_value);
624 }
625 }
626 Filter::Not(filter) => {
627 match filter.as_ref() {
629 Filter::Simple {
630 column,
631 operator,
632 value,
633 } => {
634 let filter_value = match operator {
635 FilterOperator::Equal => format!("eq.{}", value),
636 FilterOperator::NotEqual => format!("neq.{}", value),
637 FilterOperator::GreaterThan => format!("gt.{}", value),
638 FilterOperator::GreaterThanOrEqual => format!("gte.{}", value),
639 FilterOperator::LessThan => format!("lt.{}", value),
640 FilterOperator::LessThanOrEqual => format!("lte.{}", value),
641 FilterOperator::Like => format!("like.{}", value),
642 FilterOperator::ILike => format!("ilike.{}", value),
643 FilterOperator::Is => format!("is.{}", value),
644 FilterOperator::In => format!("in.{}", value),
645 FilterOperator::Contains => format!("cs.{}", value),
646 FilterOperator::ContainedBy => format!("cd.{}", value),
647 FilterOperator::StrictlyLeft => format!("sl.{}", value),
648 FilterOperator::StrictlyRight => format!("sr.{}", value),
649 FilterOperator::NotExtendToRight => format!("nxr.{}", value),
650 FilterOperator::NotExtendToLeft => format!("nxl.{}", value),
651 FilterOperator::Adjacent => format!("adj.{}", value),
652 };
653
654 params.insert(format!("not.{}", column), filter_value);
655 }
656 Filter::And(and_filters) => {
657 let and_conditions: Vec<String> = and_filters
659 .iter()
660 .map(|f| self.build_filter_condition(f))
661 .collect();
662
663 if !and_conditions.is_empty() {
664 let not_value = format!("and.({})", and_conditions.join(","));
665 params.insert("not".to_string(), not_value);
666 }
667 }
668 Filter::Or(or_filters) => {
669 let or_conditions: Vec<String> = or_filters
671 .iter()
672 .map(|f| self.build_filter_condition(f))
673 .collect();
674
675 if !or_conditions.is_empty() {
676 let not_value = format!("or.({})", or_conditions.join(","));
677 params.insert("not".to_string(), not_value);
678 }
679 }
680 Filter::Not(_) => {
681 if let Filter::Not(inner) = filter.as_ref() {
684 self.build_filter_params(inner, params);
685 }
686 }
687 }
688 }
689 }
690 }
691
692 #[allow(clippy::only_used_in_recursion)]
694 fn build_filter_condition(&self, filter: &Filter) -> String {
695 match filter {
696 Filter::Simple {
697 column,
698 operator,
699 value,
700 } => {
701 let op_str = match operator {
702 FilterOperator::Equal => "eq",
703 FilterOperator::NotEqual => "neq",
704 FilterOperator::GreaterThan => "gt",
705 FilterOperator::GreaterThanOrEqual => "gte",
706 FilterOperator::LessThan => "lt",
707 FilterOperator::LessThanOrEqual => "lte",
708 FilterOperator::Like => "like",
709 FilterOperator::ILike => "ilike",
710 FilterOperator::Is => "is",
711 FilterOperator::In => "in",
712 FilterOperator::Contains => "cs",
713 FilterOperator::ContainedBy => "cd",
714 FilterOperator::StrictlyLeft => "sl",
715 FilterOperator::StrictlyRight => "sr",
716 FilterOperator::NotExtendToRight => "nxr",
717 FilterOperator::NotExtendToLeft => "nxl",
718 FilterOperator::Adjacent => "adj",
719 };
720 format!("{}.{}.{}", column, op_str, value)
721 }
722 Filter::And(filters) => {
723 let conditions: Vec<String> = filters
724 .iter()
725 .map(|f| self.build_filter_condition(f))
726 .collect();
727 format!("and.({})", conditions.join(","))
728 }
729 Filter::Or(filters) => {
730 let conditions: Vec<String> = filters
731 .iter()
732 .map(|f| self.build_filter_condition(f))
733 .collect();
734 format!("or.({})", conditions.join(","))
735 }
736 Filter::Not(filter) => {
737 let condition = self.build_filter_condition(filter);
738 format!("not.({})", condition)
739 }
740 }
741 }
742}
743
744impl QueryBuilder {
745 fn new(database: Database, table: String) -> Self {
746 Self {
747 database,
748 table,
749 columns: None,
750 filters: Vec::new(),
751 order_by: Vec::new(),
752 limit: None,
753 offset: None,
754 single: false,
755 joins: Vec::new(),
756 }
757 }
758
759 pub fn select(mut self, columns: &str) -> Self {
761 self.columns = Some(columns.to_string());
762 self
763 }
764
765 pub fn eq(mut self, column: &str, value: &str) -> Self {
767 self.filters.push(Filter::Simple {
768 column: column.to_string(),
769 operator: FilterOperator::Equal,
770 value: value.to_string(),
771 });
772 self
773 }
774
775 pub fn neq(mut self, column: &str, value: &str) -> Self {
777 self.filters.push(Filter::Simple {
778 column: column.to_string(),
779 operator: FilterOperator::NotEqual,
780 value: value.to_string(),
781 });
782 self
783 }
784
785 pub fn gt(mut self, column: &str, value: &str) -> Self {
787 self.filters.push(Filter::Simple {
788 column: column.to_string(),
789 operator: FilterOperator::GreaterThan,
790 value: value.to_string(),
791 });
792 self
793 }
794
795 pub fn gte(mut self, column: &str, value: &str) -> Self {
797 self.filters.push(Filter::Simple {
798 column: column.to_string(),
799 operator: FilterOperator::GreaterThanOrEqual,
800 value: value.to_string(),
801 });
802 self
803 }
804
805 pub fn lt(mut self, column: &str, value: &str) -> Self {
807 self.filters.push(Filter::Simple {
808 column: column.to_string(),
809 operator: FilterOperator::LessThan,
810 value: value.to_string(),
811 });
812 self
813 }
814
815 pub fn lte(mut self, column: &str, value: &str) -> Self {
817 self.filters.push(Filter::Simple {
818 column: column.to_string(),
819 operator: FilterOperator::LessThanOrEqual,
820 value: value.to_string(),
821 });
822 self
823 }
824
825 pub fn like(mut self, column: &str, pattern: &str) -> Self {
827 self.filters.push(Filter::Simple {
828 column: column.to_string(),
829 operator: FilterOperator::Like,
830 value: pattern.to_string(),
831 });
832 self
833 }
834
835 pub fn ilike(mut self, column: &str, pattern: &str) -> Self {
837 self.filters.push(Filter::Simple {
838 column: column.to_string(),
839 operator: FilterOperator::ILike,
840 value: pattern.to_string(),
841 });
842 self
843 }
844
845 pub fn is(mut self, column: &str, value: &str) -> Self {
847 self.filters.push(Filter::Simple {
848 column: column.to_string(),
849 operator: FilterOperator::Is,
850 value: value.to_string(),
851 });
852 self
853 }
854
855 pub fn r#in(mut self, column: &str, values: &[&str]) -> Self {
857 let value = format!("({})", values.join(","));
858 self.filters.push(Filter::Simple {
859 column: column.to_string(),
860 operator: FilterOperator::In,
861 value,
862 });
863 self
864 }
865
866 pub fn order(mut self, column: &str, direction: OrderDirection) -> Self {
868 self.order_by.push(OrderBy {
869 column: column.to_string(),
870 direction,
871 nulls_first: None,
872 });
873 self
874 }
875
876 pub fn limit(mut self, limit: u32) -> Self {
878 self.limit = Some(limit);
879 self
880 }
881
882 pub fn offset(mut self, offset: u32) -> Self {
884 self.offset = Some(offset);
885 self
886 }
887
888 pub fn single(mut self) -> Self {
890 self.single = true;
891 self
892 }
893
894 pub fn and<F>(mut self, builder_fn: F) -> Self
921 where
922 F: FnOnce(QueryBuilder) -> QueryBuilder,
923 {
924 let and_builder = QueryBuilder::new(self.database.clone(), self.table.clone());
926 let built_query = builder_fn(and_builder);
927
928 if !built_query.filters.is_empty() {
929 self.filters.push(Filter::And(built_query.filters));
930 }
931
932 self
933 }
934
935 pub fn or<F>(mut self, builder_fn: F) -> Self
961 where
962 F: FnOnce(QueryBuilder) -> QueryBuilder,
963 {
964 let or_builder = QueryBuilder::new(self.database.clone(), self.table.clone());
966 let built_query = builder_fn(or_builder);
967
968 if !built_query.filters.is_empty() {
969 self.filters.push(Filter::Or(built_query.filters));
970 }
971
972 self
973 }
974
975 pub fn not<F>(mut self, builder_fn: F) -> Self
997 where
998 F: FnOnce(QueryBuilder) -> QueryBuilder,
999 {
1000 let not_builder = QueryBuilder::new(self.database.clone(), self.table.clone());
1002 let built_query = builder_fn(not_builder);
1003
1004 if !built_query.filters.is_empty() {
1005 if built_query.filters.len() == 1 {
1007 self.filters
1008 .push(Filter::Not(Box::new(built_query.filters[0].clone())));
1009 } else {
1010 self.filters
1012 .push(Filter::Not(Box::new(Filter::And(built_query.filters))));
1013 }
1014 }
1015
1016 self
1017 }
1018
1019 pub fn inner_join(mut self, foreign_table: &str, foreign_columns: &str) -> Self {
1041 self.joins.push(Join {
1042 join_type: JoinType::Inner,
1043 foreign_table: foreign_table.to_string(),
1044 foreign_columns: foreign_columns.to_string(),
1045 alias: None,
1046 });
1047 self
1048 }
1049
1050 pub fn left_join(mut self, foreign_table: &str, foreign_columns: &str) -> Self {
1072 self.joins.push(Join {
1073 join_type: JoinType::Left,
1074 foreign_table: foreign_table.to_string(),
1075 foreign_columns: foreign_columns.to_string(),
1076 alias: None,
1077 });
1078 self
1079 }
1080
1081 pub fn inner_join_as(
1103 mut self,
1104 foreign_table: &str,
1105 foreign_columns: &str,
1106 alias: &str,
1107 ) -> Self {
1108 self.joins.push(Join {
1109 join_type: JoinType::Inner,
1110 foreign_table: foreign_table.to_string(),
1111 foreign_columns: foreign_columns.to_string(),
1112 alias: Some(alias.to_string()),
1113 });
1114 self
1115 }
1116
1117 pub fn left_join_as(mut self, foreign_table: &str, foreign_columns: &str, alias: &str) -> Self {
1139 self.joins.push(Join {
1140 join_type: JoinType::Left,
1141 foreign_table: foreign_table.to_string(),
1142 foreign_columns: foreign_columns.to_string(),
1143 alias: Some(alias.to_string()),
1144 });
1145 self
1146 }
1147
1148 pub async fn execute<T>(&self) -> Result<Vec<T>>
1150 where
1151 T: for<'de> Deserialize<'de>,
1152 {
1153 debug!("Executing SELECT query on table: {}", self.table);
1154
1155 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
1156
1157 let mut query_params = self.database.build_query_params(&self.filters);
1159
1160 let select_clause = self.build_select_with_joins();
1162 query_params.insert("select".to_string(), select_clause);
1163
1164 if !self.order_by.is_empty() {
1165 let order_clauses: Vec<String> = self
1166 .order_by
1167 .iter()
1168 .map(|order| {
1169 let direction = match order.direction {
1170 OrderDirection::Ascending => "asc",
1171 OrderDirection::Descending => "desc",
1172 };
1173 format!("{}.{}", order.column, direction)
1174 })
1175 .collect();
1176 query_params.insert("order".to_string(), order_clauses.join(","));
1177 }
1178
1179 if let Some(limit) = self.limit {
1180 query_params.insert("limit".to_string(), limit.to_string());
1181 }
1182
1183 if let Some(offset) = self.offset {
1184 query_params.insert("offset".to_string(), offset.to_string());
1185 }
1186
1187 for (key, value) in query_params {
1189 url.query_pairs_mut().append_pair(&key, &value);
1190 }
1191
1192 debug!("Generated query URL: {}", url.as_str());
1193 let mut request = self.database.http_client.get(url.as_str());
1194
1195 if self.single {
1196 request = request.header("Accept", "application/vnd.pgrst.object+json");
1197 }
1198
1199 let response = request.send().await?;
1200
1201 if !response.status().is_success() {
1202 let status = response.status();
1203 let error_msg = match response.text().await {
1204 Ok(text) => text,
1205 Err(_) => format!("Query failed with status: {}", status),
1206 };
1207 return Err(Error::database(error_msg));
1208 }
1209
1210 let result = if self.single {
1211 let single_item: T = response.json().await?;
1212 vec![single_item]
1213 } else {
1214 response.json().await?
1215 };
1216
1217 info!(
1218 "SELECT query executed successfully on table: {}",
1219 self.table
1220 );
1221 Ok(result)
1222 }
1223
1224 fn build_select_with_joins(&self) -> String {
1226 let base_columns = self.columns.as_deref().unwrap_or("*");
1227
1228 if self.joins.is_empty() {
1229 return base_columns.to_string();
1230 }
1231
1232 let mut select_parts = vec![base_columns.to_string()];
1233
1234 for join in &self.joins {
1235 let join_clause = self.build_join_clause(join);
1236 select_parts.push(join_clause);
1237 }
1238
1239 select_parts.join(",")
1240 }
1241
1242 fn build_join_clause(&self, join: &Join) -> String {
1244 match (&join.alias, &join.join_type) {
1245 (Some(alias), JoinType::Inner) => {
1246 format!(
1248 "{}:{}!inner({})",
1249 alias, join.foreign_table, join.foreign_columns
1250 )
1251 }
1252 (Some(alias), JoinType::Left) => {
1253 format!("{}:{}({})", alias, join.foreign_table, join.foreign_columns)
1255 }
1256 (None, JoinType::Inner) => {
1257 format!("{}!inner({})", join.foreign_table, join.foreign_columns)
1259 }
1260 (None, JoinType::Left) => {
1261 format!("{}({})", join.foreign_table, join.foreign_columns)
1263 }
1264 }
1265 }
1266
1267 pub async fn single_execute<T>(&self) -> Result<Option<T>>
1269 where
1270 T: for<'de> Deserialize<'de>,
1271 {
1272 let mut builder = self.clone();
1273 builder.single = true;
1274
1275 let results = builder.execute().await?;
1276 Ok(results.into_iter().next())
1277 }
1278}
1279
1280impl InsertBuilder {
1281 fn new(database: Database, table: String) -> Self {
1282 Self {
1283 database,
1284 table,
1285 data: JsonValue::Null,
1286 upsert: false,
1287 on_conflict: None,
1288 returning: None,
1289 }
1290 }
1291
1292 pub fn values<T: Serialize>(mut self, data: T) -> Result<Self> {
1294 self.data = serde_json::to_value(data)?;
1295 Ok(self)
1296 }
1297
1298 pub fn upsert(mut self) -> Self {
1300 self.upsert = true;
1301 self
1302 }
1303
1304 pub fn on_conflict(mut self, columns: &str) -> Self {
1306 self.on_conflict = Some(columns.to_string());
1307 self
1308 }
1309
1310 pub fn returning(mut self, columns: &str) -> Self {
1312 self.returning = Some(columns.to_string());
1313 self
1314 }
1315
1316 pub async fn execute<T>(&self) -> Result<Vec<T>>
1318 where
1319 T: for<'de> Deserialize<'de>,
1320 {
1321 debug!("Executing INSERT query on table: {}", self.table);
1322
1323 let url = format!("{}/{}", self.database.rest_url(), self.table);
1324 let mut request = self.database.http_client.post(&url).json(&self.data);
1325
1326 if let Some(ref _returning) = self.returning {
1327 request = request.header("Prefer", "return=representation".to_string());
1328 }
1329
1330 if self.upsert {
1331 request = request.header("Prefer", "resolution=merge-duplicates");
1332 }
1333
1334 let response = request.send().await?;
1335
1336 if !response.status().is_success() {
1337 let status = response.status();
1338 let error_msg = match response.text().await {
1339 Ok(text) => text,
1340 Err(_) => format!("Insert failed with status: {}", status),
1341 };
1342 return Err(Error::database(error_msg));
1343 }
1344
1345 let result: Vec<T> = response.json().await?;
1346 info!(
1347 "INSERT query executed successfully on table: {}",
1348 self.table
1349 );
1350
1351 Ok(result)
1352 }
1353}
1354
1355impl UpdateBuilder {
1356 fn new(database: Database, table: String) -> Self {
1357 Self {
1358 database,
1359 table,
1360 data: JsonValue::Null,
1361 filters: Vec::new(),
1362 returning: None,
1363 }
1364 }
1365
1366 pub fn set<T: Serialize>(mut self, data: T) -> Result<Self> {
1368 self.data = serde_json::to_value(data)?;
1369 Ok(self)
1370 }
1371
1372 pub fn eq(mut self, column: &str, value: &str) -> Self {
1374 self.filters.push(Filter::Simple {
1375 column: column.to_string(),
1376 operator: FilterOperator::Equal,
1377 value: value.to_string(),
1378 });
1379 self
1380 }
1381
1382 pub fn returning(mut self, columns: &str) -> Self {
1384 self.returning = Some(columns.to_string());
1385 self
1386 }
1387
1388 pub async fn execute<T>(&self) -> Result<Vec<T>>
1390 where
1391 T: for<'de> Deserialize<'de>,
1392 {
1393 debug!("Executing UPDATE query on table: {}", self.table);
1394
1395 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
1396
1397 let query_params = self.database.build_query_params(&self.filters);
1399 for (key, value) in query_params {
1400 url.query_pairs_mut().append_pair(&key, &value);
1401 }
1402
1403 let mut request = self
1404 .database
1405 .http_client
1406 .patch(url.as_str())
1407 .json(&self.data);
1408
1409 if let Some(ref _returning) = self.returning {
1410 request = request.header("Prefer", "return=representation");
1411 }
1412
1413 let response = request.send().await?;
1414
1415 if !response.status().is_success() {
1416 let status = response.status();
1417 let error_msg = match response.text().await {
1418 Ok(text) => text,
1419 Err(_) => format!("Update failed with status: {}", status),
1420 };
1421 return Err(Error::database(error_msg));
1422 }
1423
1424 let result: Vec<T> = response.json().await?;
1425 info!(
1426 "UPDATE query executed successfully on table: {}",
1427 self.table
1428 );
1429
1430 Ok(result)
1431 }
1432}
1433
1434impl DeleteBuilder {
1435 fn new(database: Database, table: String) -> Self {
1436 Self {
1437 database,
1438 table,
1439 filters: Vec::new(),
1440 returning: None,
1441 }
1442 }
1443
1444 pub fn eq(mut self, column: &str, value: &str) -> Self {
1446 self.filters.push(Filter::Simple {
1447 column: column.to_string(),
1448 operator: FilterOperator::Equal,
1449 value: value.to_string(),
1450 });
1451 self
1452 }
1453
1454 pub fn returning(mut self, columns: &str) -> Self {
1456 self.returning = Some(columns.to_string());
1457 self
1458 }
1459
1460 pub async fn execute<T>(&self) -> Result<Vec<T>>
1462 where
1463 T: for<'de> Deserialize<'de>,
1464 {
1465 debug!("Executing DELETE query on table: {}", self.table);
1466
1467 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
1468
1469 let query_params = self.database.build_query_params(&self.filters);
1471 for (key, value) in query_params {
1472 url.query_pairs_mut().append_pair(&key, &value);
1473 }
1474
1475 let mut request = self.database.http_client.delete(url.as_str());
1476
1477 if let Some(ref _returning) = self.returning {
1478 request = request.header("Prefer", "return=representation");
1479 }
1480
1481 let response = request.send().await?;
1482
1483 if !response.status().is_success() {
1484 let status = response.status();
1485 let error_msg = match response.text().await {
1486 Ok(text) => text,
1487 Err(_) => format!("Delete failed with status: {}", status),
1488 };
1489 return Err(Error::database(error_msg));
1490 }
1491
1492 let result: Vec<T> = response.json().await?;
1493 info!(
1494 "DELETE query executed successfully on table: {}",
1495 self.table
1496 );
1497
1498 Ok(result)
1499 }
1500}
1501
1502impl TransactionBuilder {
1503 fn new(database: Database) -> Self {
1504 Self {
1505 database,
1506 operations: Vec::new(),
1507 }
1508 }
1509
1510 pub fn insert(mut self, table: &str, data: JsonValue) -> Self {
1512 self.operations.push(json!({
1513 "operation": TransactionOperation::Insert,
1514 "table": table,
1515 "data": data
1516 }));
1517 self
1518 }
1519
1520 pub fn update(mut self, table: &str, data: JsonValue, where_clause: &str) -> Self {
1522 self.operations.push(json!({
1523 "operation": TransactionOperation::Update,
1524 "table": table,
1525 "data": data,
1526 "where": where_clause
1527 }));
1528 self
1529 }
1530
1531 pub fn delete(mut self, table: &str, where_clause: &str) -> Self {
1533 self.operations.push(json!({
1534 "operation": TransactionOperation::Delete,
1535 "table": table,
1536 "where": where_clause
1537 }));
1538 self
1539 }
1540
1541 pub fn select(mut self, table: &str, columns: &str, where_clause: Option<&str>) -> Self {
1543 let mut operation = json!({
1544 "operation": TransactionOperation::Select,
1545 "table": table,
1546 "columns": columns
1547 });
1548
1549 if let Some(where_clause) = where_clause {
1550 operation["where"] = json!(where_clause);
1551 }
1552
1553 self.operations.push(operation);
1554 self
1555 }
1556
1557 pub fn rpc(mut self, function_name: &str, params: JsonValue) -> Self {
1559 self.operations.push(json!({
1560 "operation": TransactionOperation::Rpc,
1561 "function": function_name,
1562 "params": params
1563 }));
1564 self
1565 }
1566
1567 pub async fn commit<T>(self) -> Result<Vec<T>>
1569 where
1570 T: for<'de> Deserialize<'de>,
1571 {
1572 if self.operations.is_empty() {
1573 return Ok(Vec::new());
1574 }
1575
1576 self.database.transaction(self.operations).await
1577 }
1578
1579 pub fn len(&self) -> usize {
1581 self.operations.len()
1582 }
1583
1584 pub fn is_empty(&self) -> bool {
1586 self.operations.is_empty()
1587 }
1588}
1589
1590#[cfg(test)]
1591mod tests {
1592 use super::*;
1593
1594 #[test]
1595 fn test_logical_operators() {
1596 let and_filter = Filter::And(vec![
1598 Filter::Simple {
1599 column: "age".to_string(),
1600 operator: FilterOperator::GreaterThanOrEqual,
1601 value: "18".to_string(),
1602 },
1603 Filter::Simple {
1604 column: "status".to_string(),
1605 operator: FilterOperator::Equal,
1606 value: "active".to_string(),
1607 },
1608 ]);
1609
1610 let or_filter = Filter::Or(vec![
1612 Filter::Simple {
1613 column: "role".to_string(),
1614 operator: FilterOperator::Equal,
1615 value: "admin".to_string(),
1616 },
1617 Filter::Simple {
1618 column: "role".to_string(),
1619 operator: FilterOperator::Equal,
1620 value: "moderator".to_string(),
1621 },
1622 ]);
1623
1624 let not_filter = Filter::Not(Box::new(Filter::Simple {
1626 column: "status".to_string(),
1627 operator: FilterOperator::Equal,
1628 value: "banned".to_string(),
1629 }));
1630
1631 assert!(matches!(and_filter, Filter::And(_)));
1633 assert!(matches!(or_filter, Filter::Or(_)));
1634 assert!(matches!(not_filter, Filter::Not(_)));
1635 }
1636
1637 #[test]
1638 fn test_filter_condition_generation() {
1639 use crate::types::SupabaseConfig;
1640 use reqwest::Client as HttpClient;
1641 use std::sync::Arc;
1642
1643 let config = Arc::new(SupabaseConfig::default());
1644 let http_client = Arc::new(HttpClient::new());
1645 let db = Database::new(config, http_client).unwrap();
1646
1647 let simple_filter = Filter::Simple {
1649 column: "age".to_string(),
1650 operator: FilterOperator::Equal,
1651 value: "25".to_string(),
1652 };
1653 let condition = db.build_filter_condition(&simple_filter);
1654 assert_eq!(condition, "age.eq.25");
1655
1656 let or_filter = Filter::Or(vec![
1658 Filter::Simple {
1659 column: "status".to_string(),
1660 operator: FilterOperator::Equal,
1661 value: "online".to_string(),
1662 },
1663 Filter::Simple {
1664 column: "status".to_string(),
1665 operator: FilterOperator::Equal,
1666 value: "away".to_string(),
1667 },
1668 ]);
1669 let condition = db.build_filter_condition(&or_filter);
1670 assert_eq!(condition, "or.(status.eq.online,status.eq.away)");
1671
1672 let not_filter = Filter::Not(Box::new(Filter::Simple {
1674 column: "banned".to_string(),
1675 operator: FilterOperator::Equal,
1676 value: "true".to_string(),
1677 }));
1678 let condition = db.build_filter_condition(¬_filter);
1679 assert_eq!(condition, "not.(banned.eq.true)");
1680 }
1681
1682 #[test]
1683 fn test_query_builder_logical_methods() {
1684 use crate::types::SupabaseConfig;
1685 use reqwest::Client as HttpClient;
1686 use std::sync::Arc;
1687
1688 let config = Arc::new(SupabaseConfig::default());
1689 let http_client = Arc::new(HttpClient::new());
1690 let db = Database::new(config, http_client).unwrap();
1691
1692 let query = db
1694 .from("users")
1695 .select("*")
1696 .and(|q| q.gte("age", "18").eq("status", "active"));
1697
1698 assert_eq!(query.filters.len(), 1);
1700 assert!(matches!(query.filters[0], Filter::And(_)));
1701
1702 let query = db
1704 .from("users")
1705 .select("*")
1706 .or(|q| q.eq("role", "admin").eq("role", "mod"));
1707
1708 assert_eq!(query.filters.len(), 1);
1710 assert!(matches!(query.filters[0], Filter::Or(_)));
1711
1712 let query = db.from("users").select("*").not(|q| q.eq("banned", "true"));
1714
1715 assert_eq!(query.filters.len(), 1);
1717 assert!(matches!(query.filters[0], Filter::Not(_)));
1718 }
1719
1720 #[test]
1721 fn test_join_functionality() {
1722 use crate::types::SupabaseConfig;
1723 use reqwest::Client as HttpClient;
1724 use std::sync::Arc;
1725
1726 let config = Arc::new(SupabaseConfig::default());
1727 let http_client = Arc::new(HttpClient::new());
1728 let db = Database::new(config, http_client).unwrap();
1729
1730 let query = db
1732 .from("posts")
1733 .select("title,content")
1734 .inner_join("authors", "name,email");
1735
1736 assert_eq!(query.joins.len(), 1);
1737 let join = &query.joins[0];
1738 assert!(matches!(join.join_type, JoinType::Inner));
1739 assert_eq!(join.foreign_table, "authors");
1740 assert_eq!(join.foreign_columns, "name,email");
1741 assert!(join.alias.is_none());
1742
1743 let query = db
1745 .from("posts")
1746 .select("*")
1747 .left_join_as("authors", "name", "author");
1748
1749 assert_eq!(query.joins.len(), 1);
1750 let join = &query.joins[0];
1751 assert!(matches!(join.join_type, JoinType::Left));
1752 assert_eq!(join.foreign_table, "authors");
1753 assert_eq!(join.foreign_columns, "name");
1754 assert_eq!(join.alias.as_ref().unwrap(), "author");
1755 }
1756
1757 #[test]
1758 fn test_join_clause_generation() {
1759 use crate::types::SupabaseConfig;
1760 use reqwest::Client as HttpClient;
1761 use std::sync::Arc;
1762
1763 let config = Arc::new(SupabaseConfig::default());
1764 let http_client = Arc::new(HttpClient::new());
1765 let db = Database::new(config, http_client).unwrap();
1766 let query = QueryBuilder::new(db, "posts".to_string());
1767
1768 let inner_join = Join {
1770 join_type: JoinType::Inner,
1771 foreign_table: "authors".to_string(),
1772 foreign_columns: "name,email".to_string(),
1773 alias: None,
1774 };
1775 let clause = query.build_join_clause(&inner_join);
1776 assert_eq!(clause, "authors!inner(name,email)");
1777
1778 let left_join = Join {
1780 join_type: JoinType::Left,
1781 foreign_table: "authors".to_string(),
1782 foreign_columns: "name".to_string(),
1783 alias: Some("author".to_string()),
1784 };
1785 let clause = query.build_join_clause(&left_join);
1786 assert_eq!(clause, "author:authors(name)");
1787
1788 let inner_join_alias = Join {
1790 join_type: JoinType::Inner,
1791 foreign_table: "categories".to_string(),
1792 foreign_columns: "name,description".to_string(),
1793 alias: Some("category".to_string()),
1794 };
1795 let clause = query.build_join_clause(&inner_join_alias);
1796 assert_eq!(clause, "category:categories!inner(name,description)");
1797 }
1798
1799 #[test]
1800 fn test_select_with_joins() {
1801 use crate::types::SupabaseConfig;
1802 use reqwest::Client as HttpClient;
1803 use std::sync::Arc;
1804
1805 let config = Arc::new(SupabaseConfig::default());
1806 let http_client = Arc::new(HttpClient::new());
1807 let db = Database::new(config, http_client).unwrap();
1808
1809 let query = db
1811 .from("posts")
1812 .select("title,content")
1813 .inner_join("authors", "name,email")
1814 .left_join_as("categories", "name", "category");
1815
1816 let select_clause = query.build_select_with_joins();
1817 assert_eq!(
1818 select_clause,
1819 "title,content,authors!inner(name,email),category:categories(name)"
1820 );
1821
1822 let query = db.from("posts").inner_join("authors", "name");
1824
1825 let select_clause = query.build_select_with_joins();
1826 assert_eq!(select_clause, "*,authors!inner(name)");
1827 }
1828
1829 #[test]
1830 fn test_upsert_functionality() {
1831 use crate::types::SupabaseConfig;
1832 use reqwest::Client as HttpClient;
1833 use std::sync::Arc;
1834
1835 let config = Arc::new(SupabaseConfig::default());
1836 let http_client = Arc::new(HttpClient::new());
1837 let db = Database::new(config, http_client).unwrap();
1838
1839 let builder = db.upsert("users");
1841 assert!(builder.upsert);
1842 assert!(builder.on_conflict.is_none());
1843
1844 let builder = db.insert("users").upsert().on_conflict("id");
1846 assert!(builder.upsert);
1847 assert_eq!(builder.on_conflict.as_ref().unwrap(), "id");
1848 }
1849
1850 #[test]
1851 fn test_insert_builder_methods() {
1852 use crate::types::SupabaseConfig;
1853 use reqwest::Client as HttpClient;
1854 use std::sync::Arc;
1855
1856 let config = Arc::new(SupabaseConfig::default());
1857 let http_client = Arc::new(HttpClient::new());
1858 let db = Database::new(config, http_client).unwrap();
1859
1860 let builder = db.insert("users");
1862 assert!(!builder.upsert);
1863 assert!(builder.on_conflict.is_none());
1864
1865 let builder = db.insert("users").upsert();
1867 assert!(builder.upsert);
1868
1869 let builder = db.insert("users").upsert().on_conflict("email,id");
1871 assert!(builder.upsert);
1872 assert_eq!(builder.on_conflict.as_ref().unwrap(), "email,id");
1873 }
1874
1875 #[test]
1876 fn test_transaction_builder() {
1877 use crate::types::SupabaseConfig;
1878 use reqwest::Client as HttpClient;
1879 use std::sync::Arc;
1880
1881 let config = Arc::new(SupabaseConfig::default());
1882 let http_client = Arc::new(HttpClient::new());
1883 let db = Database::new(config, http_client).unwrap();
1884
1885 let tx = db.begin_transaction();
1887 assert!(tx.is_empty());
1888 assert_eq!(tx.len(), 0);
1889
1890 let tx = db
1892 .begin_transaction()
1893 .insert("users", json!({"name": "Alice"}))
1894 .update("profiles", json!({"bio": "Updated"}), "user_id = 1")
1895 .delete("logs", "created_at < '2023-01-01'")
1896 .select("settings", "*", Some("user_id = 1"))
1897 .rpc("calculate_stats", json!({"param": "value"}));
1898
1899 assert!(!tx.is_empty());
1900 assert_eq!(tx.len(), 5);
1901
1902 assert_eq!(tx.operations[0]["operation"], "insert");
1904 assert_eq!(tx.operations[1]["operation"], "update");
1905 assert_eq!(tx.operations[2]["operation"], "delete");
1906 assert_eq!(tx.operations[3]["operation"], "select");
1907 assert_eq!(tx.operations[4]["operation"], "rpc");
1908 }
1909
1910 #[test]
1911 fn test_transaction_operation_data() {
1912 use crate::types::SupabaseConfig;
1913 use reqwest::Client as HttpClient;
1914 use std::sync::Arc;
1915
1916 let config = Arc::new(SupabaseConfig::default());
1917 let http_client = Arc::new(HttpClient::new());
1918 let db = Database::new(config, http_client).unwrap();
1919
1920 let tx = db
1921 .begin_transaction()
1922 .insert("users", json!({"name": "Bob", "email": "bob@example.com"}))
1923 .update("users", json!({"status": "active"}), "id = 1");
1924
1925 let insert_op = &tx.operations[0];
1927 assert_eq!(insert_op["table"], "users");
1928 assert_eq!(insert_op["data"]["name"], "Bob");
1929 assert_eq!(insert_op["data"]["email"], "bob@example.com");
1930
1931 let update_op = &tx.operations[1];
1933 assert_eq!(update_op["table"], "users");
1934 assert_eq!(update_op["data"]["status"], "active");
1935 assert_eq!(update_op["where"], "id = 1");
1936 }
1937}