1use crate::{
4 error::{Error, Result},
5 types::{FilterOperator, JsonValue, OrderDirection, SupabaseConfig},
6};
7use reqwest::Client as HttpClient;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, sync::Arc};
10use tracing::{debug, info};
11use url::Url;
12
13#[derive(Debug, Clone)]
15pub struct Database {
16 http_client: Arc<HttpClient>,
17 config: Arc<SupabaseConfig>,
18}
19
20#[derive(Debug, Clone)]
22pub struct QueryBuilder {
23 database: Database,
24 table: String,
25 columns: Option<String>,
26 filters: Vec<Filter>,
27 order_by: Vec<OrderBy>,
28 limit: Option<u32>,
29 offset: Option<u32>,
30 single: bool,
31}
32
33#[derive(Debug, Clone)]
35pub struct InsertBuilder {
36 database: Database,
37 table: String,
38 data: JsonValue,
39 upsert: bool,
40 on_conflict: Option<String>,
41 returning: Option<String>,
42}
43
44#[derive(Debug, Clone)]
46pub struct UpdateBuilder {
47 database: Database,
48 table: String,
49 data: JsonValue,
50 filters: Vec<Filter>,
51 returning: Option<String>,
52}
53
54#[derive(Debug, Clone)]
56pub struct DeleteBuilder {
57 database: Database,
58 table: String,
59 filters: Vec<Filter>,
60 returning: Option<String>,
61}
62
63#[derive(Debug, Clone)]
65struct Filter {
66 column: String,
67 operator: FilterOperator,
68 value: String,
69}
70
71#[derive(Debug, Clone)]
73struct OrderBy {
74 column: String,
75 direction: OrderDirection,
76 #[allow(dead_code)]
77 nulls_first: Option<bool>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DatabaseResponse<T> {
83 pub data: T,
84 pub count: Option<u64>,
85}
86
87impl Database {
88 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
90 debug!("Initializing Database module");
91
92 Ok(Self {
93 http_client,
94 config,
95 })
96 }
97
98 pub fn from(&self, table: &str) -> QueryBuilder {
100 QueryBuilder::new(self.clone(), table.to_string())
101 }
102
103 pub fn insert(&self, table: &str) -> InsertBuilder {
105 InsertBuilder::new(self.clone(), table.to_string())
106 }
107
108 pub fn update(&self, table: &str) -> UpdateBuilder {
110 UpdateBuilder::new(self.clone(), table.to_string())
111 }
112
113 pub fn delete(&self, table: &str) -> DeleteBuilder {
115 DeleteBuilder::new(self.clone(), table.to_string())
116 }
117
118 pub async fn rpc(&self, function_name: &str, params: Option<JsonValue>) -> Result<JsonValue> {
120 debug!("Executing RPC function: {}", function_name);
121
122 let url = format!("{}/rest/v1/rpc/{}", self.config.url, function_name);
123
124 let mut request = self.http_client.post(&url);
125
126 if let Some(params) = params {
127 request = request.json(¶ms);
128 }
129
130 let response = request.send().await?;
131
132 if !response.status().is_success() {
133 let status = response.status();
134 let error_msg = match response.text().await {
135 Ok(text) => text,
136 Err(_) => format!("RPC failed with status: {}", status),
137 };
138 return Err(Error::database(error_msg));
139 }
140
141 let result: JsonValue = response.json().await?;
142 info!("RPC function {} executed successfully", function_name);
143
144 Ok(result)
145 }
146
147 fn rest_url(&self) -> String {
149 format!("{}/rest/v1", self.config.url)
150 }
151
152 fn build_query_params(&self, filters: &[Filter]) -> HashMap<String, String> {
154 let mut params = HashMap::new();
155
156 for filter in filters {
157 let key = match filter.operator {
158 FilterOperator::Equal => filter.column.clone(),
159 FilterOperator::NotEqual => format!("{}:neq", filter.column),
160 FilterOperator::GreaterThan => format!("{}:gt", filter.column),
161 FilterOperator::GreaterThanOrEqual => format!("{}:gte", filter.column),
162 FilterOperator::LessThan => format!("{}:lt", filter.column),
163 FilterOperator::LessThanOrEqual => format!("{}:lte", filter.column),
164 FilterOperator::Like => format!("{}:like", filter.column),
165 FilterOperator::ILike => format!("{}:ilike", filter.column),
166 FilterOperator::Is => format!("{}:is", filter.column),
167 FilterOperator::In => format!("{}:in", filter.column),
168 FilterOperator::Contains => format!("{}:cs", filter.column),
169 FilterOperator::ContainedBy => format!("{}:cd", filter.column),
170 FilterOperator::StrictlyLeft => format!("{}:sl", filter.column),
171 FilterOperator::StrictlyRight => format!("{}:sr", filter.column),
172 FilterOperator::NotExtendToRight => format!("{}:nxr", filter.column),
173 FilterOperator::NotExtendToLeft => format!("{}:nxl", filter.column),
174 FilterOperator::Adjacent => format!("{}:adj", filter.column),
175 };
176
177 params.insert(key, filter.value.clone());
178 }
179
180 params
181 }
182}
183
184impl QueryBuilder {
185 fn new(database: Database, table: String) -> Self {
186 Self {
187 database,
188 table,
189 columns: None,
190 filters: Vec::new(),
191 order_by: Vec::new(),
192 limit: None,
193 offset: None,
194 single: false,
195 }
196 }
197
198 pub fn select(mut self, columns: &str) -> Self {
200 self.columns = Some(columns.to_string());
201 self
202 }
203
204 pub fn eq(mut self, column: &str, value: &str) -> Self {
206 self.filters.push(Filter {
207 column: column.to_string(),
208 operator: FilterOperator::Equal,
209 value: value.to_string(),
210 });
211 self
212 }
213
214 pub fn neq(mut self, column: &str, value: &str) -> Self {
216 self.filters.push(Filter {
217 column: column.to_string(),
218 operator: FilterOperator::NotEqual,
219 value: value.to_string(),
220 });
221 self
222 }
223
224 pub fn gt(mut self, column: &str, value: &str) -> Self {
226 self.filters.push(Filter {
227 column: column.to_string(),
228 operator: FilterOperator::GreaterThan,
229 value: value.to_string(),
230 });
231 self
232 }
233
234 pub fn gte(mut self, column: &str, value: &str) -> Self {
236 self.filters.push(Filter {
237 column: column.to_string(),
238 operator: FilterOperator::GreaterThanOrEqual,
239 value: value.to_string(),
240 });
241 self
242 }
243
244 pub fn lt(mut self, column: &str, value: &str) -> Self {
246 self.filters.push(Filter {
247 column: column.to_string(),
248 operator: FilterOperator::LessThan,
249 value: value.to_string(),
250 });
251 self
252 }
253
254 pub fn lte(mut self, column: &str, value: &str) -> Self {
256 self.filters.push(Filter {
257 column: column.to_string(),
258 operator: FilterOperator::LessThanOrEqual,
259 value: value.to_string(),
260 });
261 self
262 }
263
264 pub fn like(mut self, column: &str, pattern: &str) -> Self {
266 self.filters.push(Filter {
267 column: column.to_string(),
268 operator: FilterOperator::Like,
269 value: pattern.to_string(),
270 });
271 self
272 }
273
274 pub fn ilike(mut self, column: &str, pattern: &str) -> Self {
276 self.filters.push(Filter {
277 column: column.to_string(),
278 operator: FilterOperator::ILike,
279 value: pattern.to_string(),
280 });
281 self
282 }
283
284 pub fn is(mut self, column: &str, value: &str) -> Self {
286 self.filters.push(Filter {
287 column: column.to_string(),
288 operator: FilterOperator::Is,
289 value: value.to_string(),
290 });
291 self
292 }
293
294 pub fn r#in(mut self, column: &str, values: &[&str]) -> Self {
296 let value = format!("({})", values.join(","));
297 self.filters.push(Filter {
298 column: column.to_string(),
299 operator: FilterOperator::In,
300 value,
301 });
302 self
303 }
304
305 pub fn order(mut self, column: &str, direction: OrderDirection) -> Self {
307 self.order_by.push(OrderBy {
308 column: column.to_string(),
309 direction,
310 nulls_first: None,
311 });
312 self
313 }
314
315 pub fn limit(mut self, limit: u32) -> Self {
317 self.limit = Some(limit);
318 self
319 }
320
321 pub fn offset(mut self, offset: u32) -> Self {
323 self.offset = Some(offset);
324 self
325 }
326
327 pub fn single(mut self) -> Self {
329 self.single = true;
330 self
331 }
332
333 pub async fn execute<T>(&self) -> Result<Vec<T>>
335 where
336 T: for<'de> Deserialize<'de>,
337 {
338 debug!("Executing SELECT query on table: {}", self.table);
339
340 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
341
342 let mut query_params = self.database.build_query_params(&self.filters);
344
345 if let Some(ref columns) = self.columns {
346 query_params.insert("select".to_string(), columns.clone());
347 }
348
349 if !self.order_by.is_empty() {
350 let order_clauses: Vec<String> = self
351 .order_by
352 .iter()
353 .map(|order| {
354 let direction = match order.direction {
355 OrderDirection::Ascending => "asc",
356 OrderDirection::Descending => "desc",
357 };
358 format!("{}.{}", order.column, direction)
359 })
360 .collect();
361 query_params.insert("order".to_string(), order_clauses.join(","));
362 }
363
364 if let Some(limit) = self.limit {
365 query_params.insert("limit".to_string(), limit.to_string());
366 }
367
368 if let Some(offset) = self.offset {
369 query_params.insert("offset".to_string(), offset.to_string());
370 }
371
372 for (key, value) in query_params {
374 url.query_pairs_mut().append_pair(&key, &value);
375 }
376
377 let mut request = self.database.http_client.get(url.as_str());
378
379 if self.single {
380 request = request.header("Accept", "application/vnd.pgrst.object+json");
381 }
382
383 let response = request.send().await?;
384
385 if !response.status().is_success() {
386 let status = response.status();
387 let error_msg = match response.text().await {
388 Ok(text) => text,
389 Err(_) => format!("Query failed with status: {}", status),
390 };
391 return Err(Error::database(error_msg));
392 }
393
394 let result = if self.single {
395 let single_item: T = response.json().await?;
396 vec![single_item]
397 } else {
398 response.json().await?
399 };
400
401 info!(
402 "SELECT query executed successfully on table: {}",
403 self.table
404 );
405 Ok(result)
406 }
407
408 pub async fn single_execute<T>(&self) -> Result<Option<T>>
410 where
411 T: for<'de> Deserialize<'de>,
412 {
413 let mut builder = self.clone();
414 builder.single = true;
415
416 let results = builder.execute().await?;
417 Ok(results.into_iter().next())
418 }
419}
420
421impl InsertBuilder {
422 fn new(database: Database, table: String) -> Self {
423 Self {
424 database,
425 table,
426 data: JsonValue::Null,
427 upsert: false,
428 on_conflict: None,
429 returning: None,
430 }
431 }
432
433 pub fn values<T: Serialize>(mut self, data: T) -> Result<Self> {
435 self.data = serde_json::to_value(data)?;
436 Ok(self)
437 }
438
439 pub fn upsert(mut self) -> Self {
441 self.upsert = true;
442 self
443 }
444
445 pub fn on_conflict(mut self, columns: &str) -> Self {
447 self.on_conflict = Some(columns.to_string());
448 self
449 }
450
451 pub fn returning(mut self, columns: &str) -> Self {
453 self.returning = Some(columns.to_string());
454 self
455 }
456
457 pub async fn execute<T>(&self) -> Result<Vec<T>>
459 where
460 T: for<'de> Deserialize<'de>,
461 {
462 debug!("Executing INSERT query on table: {}", self.table);
463
464 let url = format!("{}/{}", self.database.rest_url(), self.table);
465 let mut request = self.database.http_client.post(&url).json(&self.data);
466
467 if let Some(ref _returning) = self.returning {
468 request = request.header("Prefer", "return=representation".to_string());
469 }
470
471 if self.upsert {
472 request = request.header("Prefer", "resolution=merge-duplicates");
473 }
474
475 let response = request.send().await?;
476
477 if !response.status().is_success() {
478 let status = response.status();
479 let error_msg = match response.text().await {
480 Ok(text) => text,
481 Err(_) => format!("Insert failed with status: {}", status),
482 };
483 return Err(Error::database(error_msg));
484 }
485
486 let result: Vec<T> = response.json().await?;
487 info!(
488 "INSERT query executed successfully on table: {}",
489 self.table
490 );
491
492 Ok(result)
493 }
494}
495
496impl UpdateBuilder {
497 fn new(database: Database, table: String) -> Self {
498 Self {
499 database,
500 table,
501 data: JsonValue::Null,
502 filters: Vec::new(),
503 returning: None,
504 }
505 }
506
507 pub fn set<T: Serialize>(mut self, data: T) -> Result<Self> {
509 self.data = serde_json::to_value(data)?;
510 Ok(self)
511 }
512
513 pub fn eq(mut self, column: &str, value: &str) -> Self {
515 self.filters.push(Filter {
516 column: column.to_string(),
517 operator: FilterOperator::Equal,
518 value: value.to_string(),
519 });
520 self
521 }
522
523 pub fn returning(mut self, columns: &str) -> Self {
525 self.returning = Some(columns.to_string());
526 self
527 }
528
529 pub async fn execute<T>(&self) -> Result<Vec<T>>
531 where
532 T: for<'de> Deserialize<'de>,
533 {
534 debug!("Executing UPDATE query on table: {}", self.table);
535
536 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
537
538 let query_params = self.database.build_query_params(&self.filters);
540 for (key, value) in query_params {
541 url.query_pairs_mut().append_pair(&key, &value);
542 }
543
544 let mut request = self
545 .database
546 .http_client
547 .patch(url.as_str())
548 .json(&self.data);
549
550 if let Some(ref _returning) = self.returning {
551 request = request.header("Prefer", "return=representation");
552 }
553
554 let response = request.send().await?;
555
556 if !response.status().is_success() {
557 let status = response.status();
558 let error_msg = match response.text().await {
559 Ok(text) => text,
560 Err(_) => format!("Update failed with status: {}", status),
561 };
562 return Err(Error::database(error_msg));
563 }
564
565 let result: Vec<T> = response.json().await?;
566 info!(
567 "UPDATE query executed successfully on table: {}",
568 self.table
569 );
570
571 Ok(result)
572 }
573}
574
575impl DeleteBuilder {
576 fn new(database: Database, table: String) -> Self {
577 Self {
578 database,
579 table,
580 filters: Vec::new(),
581 returning: None,
582 }
583 }
584
585 pub fn eq(mut self, column: &str, value: &str) -> Self {
587 self.filters.push(Filter {
588 column: column.to_string(),
589 operator: FilterOperator::Equal,
590 value: value.to_string(),
591 });
592 self
593 }
594
595 pub fn returning(mut self, columns: &str) -> Self {
597 self.returning = Some(columns.to_string());
598 self
599 }
600
601 pub async fn execute<T>(&self) -> Result<Vec<T>>
603 where
604 T: for<'de> Deserialize<'de>,
605 {
606 debug!("Executing DELETE query on table: {}", self.table);
607
608 let mut url = Url::parse(&format!("{}/{}", self.database.rest_url(), self.table))?;
609
610 let query_params = self.database.build_query_params(&self.filters);
612 for (key, value) in query_params {
613 url.query_pairs_mut().append_pair(&key, &value);
614 }
615
616 let mut request = self.database.http_client.delete(url.as_str());
617
618 if let Some(ref _returning) = self.returning {
619 request = request.header("Prefer", "return=representation");
620 }
621
622 let response = request.send().await?;
623
624 if !response.status().is_success() {
625 let status = response.status();
626 let error_msg = match response.text().await {
627 Ok(text) => text,
628 Err(_) => format!("Delete failed with status: {}", status),
629 };
630 return Err(Error::database(error_msg));
631 }
632
633 let result: Vec<T> = response.json().await?;
634 info!(
635 "DELETE query executed successfully on table: {}",
636 self.table
637 );
638
639 Ok(result)
640 }
641}