1use crate::{Error, http_client::HttpClient};
7use crate::generated::api_core::GeneratedDbApi;
8use serde::Serialize;
9use serde_json::{json, Value};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13type FilterTuple = (String, String, Value);
14
15#[derive(Debug, Clone)]
18pub struct ListResult {
19 pub items: Vec<Value>,
20 pub total: Option<u64>,
21 pub page: Option<u64>,
22 pub per_page: Option<u64>,
23 pub has_more: Option<bool>,
24 pub cursor: Option<String>,
25}
26
27impl ListResult {
28 pub fn from_value(v: Value) -> Self {
29 let items = v.get("items")
30 .and_then(|i| i.as_array())
31 .cloned()
32 .unwrap_or_default();
33 Self {
34 items,
35 total: v.get("total").and_then(|t| t.as_u64()),
36 page: v.get("page").and_then(|p| p.as_u64()),
37 per_page: v.get("perPage").and_then(|p| p.as_u64()),
38 has_more: v.get("hasMore").and_then(|h| h.as_bool()),
39 cursor: v.get("cursor").and_then(|c| c.as_str()).map(|s| s.to_owned()),
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct BatchResult {
47 pub total_processed: u64,
48 pub total_succeeded: u64,
49 pub errors: Vec<Value>,
50}
51
52impl BatchResult {
53 pub fn from_value(v: Value) -> Self {
54 Self {
55 total_processed: v.get("totalProcessed").and_then(|t| t.as_u64()).unwrap_or(0),
56 total_succeeded: v.get("totalSucceeded").and_then(|t| t.as_u64()).unwrap_or(0),
57 errors: v.get("errors")
58 .and_then(|e| e.as_array())
59 .cloned()
60 .unwrap_or_default(),
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct UpsertResult {
68 pub record: Value,
69 pub inserted: bool,
70}
71
72impl UpsertResult {
73 pub fn from_value(v: Value) -> Self {
74 let inserted = v.get("action").and_then(|a| a.as_str()) == Some("inserted");
75 Self { record: v, inserted }
76 }
77}
78
79async fn core_get(
84 core: &GeneratedDbApi<'_>,
85 method: &str,
86 namespace: &str,
87 instance_id: Option<&str>,
88 table: &str,
89 id: Option<&str>,
90 query: &HashMap<String, String>,
91) -> Result<Value, Error> {
92 match instance_id {
93 Some(iid) => {
94 match method {
96 "list" => core.db_list_records(namespace, iid, table, query).await,
97 "get" => core.db_get_record(namespace, iid, table, id.unwrap(), query).await,
98 "count" => core.db_count_records(namespace, iid, table, query).await,
99 "search" => core.db_search_records(namespace, iid, table, query).await,
100 _ => unreachable!("unknown core_get method: {}", method),
101 }
102 }
103 None => {
104 match method {
106 "list" => core.db_single_list_records(namespace, table, query).await,
107 "get" => core.db_single_get_record(namespace, table, id.unwrap(), query).await,
108 "count" => core.db_single_count_records(namespace, table, query).await,
109 "search" => core.db_single_search_records(namespace, table, query).await,
110 _ => unreachable!("unknown core_get method: {}", method),
111 }
112 }
113 }
114}
115
116async fn core_insert(
118 core: &GeneratedDbApi<'_>,
119 namespace: &str,
120 instance_id: Option<&str>,
121 table: &str,
122 body: &Value,
123 query: &HashMap<String, String>,
124) -> Result<Value, Error> {
125 match instance_id {
126 Some(iid) => core.db_insert_record(namespace, iid, table, body, query).await,
127 None => core.db_single_insert_record(namespace, table, body, query).await,
128 }
129}
130
131async fn core_update(
133 core: &GeneratedDbApi<'_>,
134 namespace: &str,
135 instance_id: Option<&str>,
136 table: &str,
137 id: &str,
138 body: &Value,
139) -> Result<Value, Error> {
140 match instance_id {
141 Some(iid) => core.db_update_record(namespace, iid, table, id, body).await,
142 None => core.db_single_update_record(namespace, table, id, body).await,
143 }
144}
145
146async fn core_delete(
148 core: &GeneratedDbApi<'_>,
149 namespace: &str,
150 instance_id: Option<&str>,
151 table: &str,
152 id: &str,
153) -> Result<Value, Error> {
154 match instance_id {
155 Some(iid) => core.db_delete_record(namespace, iid, table, id).await,
156 None => core.db_single_delete_record(namespace, table, id).await,
157 }
158}
159
160async fn core_batch(
162 core: &GeneratedDbApi<'_>,
163 namespace: &str,
164 instance_id: Option<&str>,
165 table: &str,
166 body: &Value,
167 query: &HashMap<String, String>,
168) -> Result<Value, Error> {
169 match instance_id {
170 Some(iid) => core.db_batch_records(namespace, iid, table, body, query).await,
171 None => core.db_single_batch_records(namespace, table, body, query).await,
172 }
173}
174
175async fn core_batch_by_filter(
177 core: &GeneratedDbApi<'_>,
178 namespace: &str,
179 instance_id: Option<&str>,
180 table: &str,
181 body: &Value,
182 query: &HashMap<String, String>,
183) -> Result<Value, Error> {
184 match instance_id {
185 Some(iid) => core.db_batch_by_filter(namespace, iid, table, body, query).await,
186 None => core.db_single_batch_by_filter(namespace, table, body, query).await,
187 }
188}
189
190#[derive(Clone)]
205pub struct TableRef {
206 http: Arc<HttpClient>,
207 name: String,
208 namespace: String,
210 instance_id: Option<String>,
212 filters: Vec<FilterTuple>,
213 or_filters: Vec<FilterTuple>,
214 sorts: Vec<[String; 2]>,
215 limit_val: Option<u32>,
216 offset_val: Option<u32>,
217 page_val: Option<u32>,
218 search_val: Option<String>,
219 after_val: Option<String>,
220 before_val: Option<String>,
221}
222
223pub struct OrBuilder {
224 pub filters: Vec<FilterTuple>,
225}
226
227impl OrBuilder {
228 pub fn new() -> Self {
229 Self { filters: vec![] }
230 }
231
232 pub fn where_<V: Serialize>(mut self, field: &str, op: &str, value: V) -> Self {
233 self.filters.push((
234 field.into(),
235 op.into(),
236 serde_json::to_value(value).unwrap_or(Value::Null),
237 ));
238 self
239 }
240}
241
242impl TableRef {
243 pub fn new(http: Arc<HttpClient>, name: &str) -> Self {
244 Self::with_db(http, name, "shared", None)
245 }
246
247 pub fn with_db(http: Arc<HttpClient>, name: &str, namespace: &str, instance_id: Option<&str>) -> Self {
249 Self {
250 http,
251 name: name.to_string(),
252 namespace: namespace.to_string(),
253 instance_id: instance_id.map(|s| s.to_string()),
254 filters: vec![],
255 or_filters: vec![],
256 sorts: vec![],
257 limit_val: None,
258 offset_val: None,
259 page_val: None,
260 search_val: None,
261 after_val: None,
262 before_val: None,
263 }
264 }
265
266 pub fn where_<V: Serialize>(self, field: &str, op: &str, value: V) -> Self {
269 let mut c = self;
270 c.filters.push((
271 field.into(),
272 op.into(),
273 serde_json::to_value(value).unwrap_or(Value::Null),
274 ));
275 c
276 }
277
278 pub fn or_(self, builder_fn: impl FnOnce(OrBuilder) -> OrBuilder) -> Self {
279 let mut c = self;
280 let builder = builder_fn(OrBuilder::new());
281 c.or_filters.extend(builder.filters);
282 c
283 }
284
285 pub fn order_by(self, field: &str, direction: &str) -> Self {
286 let mut c = self;
287 c.sorts.push([field.into(), direction.into()]);
288 c
289 }
290
291 pub fn limit(mut self, n: u32) -> Self { self.limit_val = Some(n); self }
292 pub fn offset(mut self, n: u32) -> Self { self.offset_val = Some(n); self }
293 pub fn page(mut self, n: u32) -> Self { self.page_val = Some(n); self }
294 pub fn search(mut self, q: &str) -> Self { self.search_val = Some(q.into()); self }
295 pub fn after(mut self, cursor: &str) -> Self { self.after_val = Some(cursor.into()); self }
296 pub fn before(mut self, cursor: &str) -> Self { self.before_val = Some(cursor.into()); self }
297
298 pub fn name(&self) -> &str { &self.name }
300
301 fn core(&self) -> GeneratedDbApi<'_> {
303 GeneratedDbApi::new(&self.http)
304 }
305
306 fn validate_query_state(&self) -> Result<(), Error> {
307 let has_cursor = self.after_val.is_some() || self.before_val.is_some();
308 let has_offset = self.offset_val.is_some() || self.page_val.is_some();
309 if has_cursor && has_offset {
310 return Err(Error::Api {
311 status: 400,
312 message: "Cannot use page()/offset() with after()/before() — choose offset or cursor pagination".to_string(),
313 });
314 }
315 Ok(())
316 }
317
318 fn iid(&self) -> Option<&str> {
320 self.instance_id.as_deref()
321 }
322
323 fn build_query_params(&self) -> HashMap<String, String> {
325 let mut params = HashMap::new();
326 if !self.filters.is_empty() {
327 params.insert("filter".to_string(), serde_json::to_string(&self.filters).unwrap_or_default());
328 }
329 if !self.or_filters.is_empty() {
330 params.insert("orFilter".to_string(), serde_json::to_string(&self.or_filters).unwrap_or_default());
331 }
332 if !self.sorts.is_empty() {
333 let sort_str = self.sorts.iter()
334 .map(|s| format!("{}:{}", s[0], s[1]))
335 .collect::<Vec<_>>()
336 .join(",");
337 params.insert("sort".to_string(), sort_str);
338 }
339 if let Some(v) = self.limit_val { params.insert("limit".to_string(), v.to_string()); }
340 if let Some(v) = self.offset_val { params.insert("offset".to_string(), v.to_string()); }
341 if let Some(v) = self.page_val { params.insert("page".to_string(), v.to_string()); }
342 if let Some(ref v) = self.search_val { params.insert("search".to_string(), v.clone()); }
343 if let Some(ref v) = self.after_val { params.insert("after".to_string(), v.clone()); }
344 if let Some(ref v) = self.before_val { params.insert("before".to_string(), v.clone()); }
345 params
346 }
347
348 pub async fn get_list(&self) -> Result<Value, Error> {
352 self.validate_query_state()?;
353 let query = self.build_query_params();
354 let core = self.core();
355 if self.search_val.is_some() {
356 core_get(&core, "search", &self.namespace, self.iid(), &self.name, None, &query).await
357 } else {
358 core_get(&core, "list", &self.namespace, self.iid(), &self.name, None, &query).await
359 }
360 }
361
362 pub async fn get_first(&self) -> Result<Value, Error> {
365 let result = self.clone().limit(1).get_list().await?;
366 let items = result.get("items").and_then(|i| i.as_array());
367 match items.and_then(|arr| arr.first()) {
368 Some(item) => Ok(item.clone()),
369 None => Ok(Value::Null),
370 }
371 }
372
373 pub async fn sql(&self, query: &str, params: &[Value]) -> Result<Vec<Value>, Error> {
375 let mut body = json!({
376 "namespace": self.namespace,
377 "sql": query,
378 "params": params,
379 });
380 if let Some(instance_id) = self.iid() {
381 body["id"] = json!(instance_id);
382 }
383 let result = self.http.post("/api/sql", &body).await?;
384 Ok(result
385 .get("items")
386 .and_then(|items| items.as_array())
387 .cloned()
388 .unwrap_or_default())
389 }
390
391 pub async fn get_one(&self, id: &str) -> Result<Value, Error> {
393 let core = self.core();
394 let query = HashMap::new();
395 core_get(&core, "get", &self.namespace, self.iid(), &self.name, Some(id), &query).await
396 }
397
398 pub async fn insert(&self, record: &Value) -> Result<Value, Error> {
400 let core = self.core();
401 let query = HashMap::new();
402 core_insert(&core, &self.namespace, self.iid(), &self.name, record, &query).await
403 }
404
405 pub async fn update(&self, id: &str, data: &Value) -> Result<Value, Error> {
407 let core = self.core();
408 core_update(&core, &self.namespace, self.iid(), &self.name, id, data).await
409 }
410
411 pub async fn delete(&self, id: &str) -> Result<Value, Error> {
413 let core = self.core();
414 core_delete(&core, &self.namespace, self.iid(), &self.name, id).await
415 }
416
417 pub async fn upsert(&self, record: &Value, conflict_target: Option<&str>) -> Result<Value, Error> {
419 let core = self.core();
420 let mut query = HashMap::new();
421 query.insert("upsert".to_string(), "true".to_string());
422 if let Some(ct) = conflict_target {
423 query.insert("conflictTarget".to_string(), ct.to_string());
424 }
425 core_insert(&core, &self.namespace, self.iid(), &self.name, record, &query).await
426 }
427
428 pub async fn count(&self) -> Result<u64, Error> {
430 self.validate_query_state()?;
431 let query = self.build_query_params();
432 let core = self.core();
433 let result = core_get(&core, "count", &self.namespace, self.iid(), &self.name, None, &query).await?;
434 Ok(result.get("total").and_then(|v| v.as_u64()).unwrap_or(0))
435 }
436
437 pub async fn insert_many(&self, records: Vec<Value>) -> Result<Value, Error> {
441 let core = self.core();
442 let query = HashMap::new();
443 if records.len() <= 500 {
444 let body = json!({ "inserts": records });
445 return core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await;
446 }
447
448 let mut inserted = Vec::new();
449 for chunk in records.chunks(500) {
450 let body = json!({ "inserts": chunk });
451 let result = core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await?;
452 inserted.extend(
453 result
454 .get("inserted")
455 .and_then(Value::as_array)
456 .cloned()
457 .unwrap_or_default(),
458 );
459 }
460 Ok(json!({ "inserted": inserted }))
461 }
462
463 pub async fn upsert_many(&self, records: Vec<Value>, conflict_target: Option<&str>) -> Result<Value, Error> {
465 let core = self.core();
466 let mut query = HashMap::new();
467 query.insert("upsert".to_string(), "true".to_string());
468 if let Some(ct) = conflict_target {
469 query.insert("conflictTarget".to_string(), ct.to_string());
470 }
471 if records.len() <= 500 {
472 let body = json!({ "inserts": records });
473 return core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await;
474 }
475
476 let mut inserted = Vec::new();
477 for chunk in records.chunks(500) {
478 let body = json!({ "inserts": chunk });
479 let result = core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await?;
480 inserted.extend(
481 result
482 .get("inserted")
483 .and_then(Value::as_array)
484 .cloned()
485 .unwrap_or_default(),
486 );
487 }
488 Ok(json!({ "inserted": inserted }))
489 }
490
491 pub async fn update_many(&self, update: &Value) -> Result<Value, Error> {
493 if self.filters.is_empty() {
494 return Err(Error::Api {
495 status: 400,
496 message: "update_many requires at least one where() filter".to_string(),
497 });
498 }
499 let core = self.core();
500 let mut body = json!({
501 "action": "update",
502 "filter": self.filters,
503 "update": update
504 });
505 if !self.or_filters.is_empty() {
506 body.as_object_mut().unwrap().insert("orFilter".to_string(), json!(self.or_filters));
507 }
508 let query = HashMap::new();
509 core_batch_by_filter(&core, &self.namespace, self.iid(), &self.name, &body, &query).await
510 }
511
512 pub async fn delete_many(&self) -> Result<Value, Error> {
514 if self.filters.is_empty() {
515 return Err(Error::Api {
516 status: 400,
517 message: "delete_many requires at least one where() filter".to_string(),
518 });
519 }
520 let core = self.core();
521 let mut body = json!({
522 "action": "delete",
523 "filter": self.filters
524 });
525 if !self.or_filters.is_empty() {
526 body.as_object_mut().unwrap().insert("orFilter".to_string(), json!(self.or_filters));
527 }
528 let query = HashMap::new();
529 core_batch_by_filter(&core, &self.namespace, self.iid(), &self.name, &body, &query).await
530 }
531
532 pub fn doc(&self, id: &str) -> DocRef {
534 DocRef {
535 table: self.clone(),
536 id: id.to_string(),
537 }
538 }
539}
540
541#[derive(Clone)]
542pub struct DocRef {
543 table: TableRef,
544 id: String,
545}
546
547impl DocRef {
548 pub async fn get(&self) -> Result<Value, Error> {
549 self.table.get_one(&self.id).await
550 }
551
552 pub async fn update(&self, data: &Value) -> Result<Value, Error> {
553 self.table.update(&self.id, data).await
554 }
555
556 pub async fn delete(&self) -> Result<Value, Error> {
557 self.table.delete(&self.id).await
558 }
559}