Skip to main content

edgebase_core/
table.rs

1//! EdgeBase Rust SDK — TableRef (immutable query builder)
2//!
3//! All HTTP calls delegate to Generated Core (api_core.rs).
4//! No hardcoded API paths — the core is the single source of truth.
5
6use crate::{Error, http_client::HttpClient};
7use crate::generated::api_core::GeneratedDbApi;
8use serde::Serialize;
9use serde_json::{json, Value};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13type FilterTuple = (String, String, Value);
14
15/// Unified list query result.
16/// Returned by `TableRef::get_list()`.
17#[derive(Debug, Clone)]
18pub struct ListResult {
19    pub items: Vec<Value>,
20    pub total: Option<u64>,
21    pub page: Option<u64>,
22    pub per_page: Option<u64>,
23    pub has_more: Option<bool>,
24    pub cursor: Option<String>,
25}
26
27impl ListResult {
28    pub fn from_value(v: Value) -> Self {
29        let items = v.get("items")
30            .and_then(|i| i.as_array())
31            .cloned()
32            .unwrap_or_default();
33        Self {
34            items,
35            total: v.get("total").and_then(|t| t.as_u64()),
36            page: v.get("page").and_then(|p| p.as_u64()),
37            per_page: v.get("perPage").and_then(|p| p.as_u64()),
38            has_more: v.get("hasMore").and_then(|h| h.as_bool()),
39            cursor: v.get("cursor").and_then(|c| c.as_str()).map(|s| s.to_owned()),
40        }
41    }
42}
43
44/// Batch operation result.
45#[derive(Debug, Clone)]
46pub struct BatchResult {
47    pub total_processed: u64,
48    pub total_succeeded: u64,
49    pub errors: Vec<Value>,
50}
51
52impl BatchResult {
53    pub fn from_value(v: Value) -> Self {
54        Self {
55            total_processed: v.get("totalProcessed").and_then(|t| t.as_u64()).unwrap_or(0),
56            total_succeeded: v.get("totalSucceeded").and_then(|t| t.as_u64()).unwrap_or(0),
57            errors: v.get("errors")
58                .and_then(|e| e.as_array())
59                .cloned()
60                .unwrap_or_default(),
61        }
62    }
63}
64
65/// Upsert operation result.
66#[derive(Debug, Clone)]
67pub struct UpsertResult {
68    pub record: Value,
69    pub inserted: bool,
70}
71
72impl UpsertResult {
73    pub fn from_value(v: Value) -> Self {
74        let inserted = v.get("action").and_then(|a| a.as_str()) == Some("inserted");
75        Self { record: v, inserted }
76    }
77}
78
79// ── Core dispatch helpers ────────────────────────────────────────────
80// These mirror the JS SDK pattern: dispatch to single-instance vs dynamic based on instance_id.
81
82/// Dispatch a GET-style table operation (list, get, count, search) to the correct generated core method.
83async fn core_get(
84    core: &GeneratedDbApi<'_>,
85    method: &str,
86    namespace: &str,
87    instance_id: Option<&str>,
88    table: &str,
89    id: Option<&str>,
90    query: &HashMap<String, String>,
91) -> Result<Value, Error> {
92    match instance_id {
93        Some(iid) => {
94            // Dynamic DB
95            match method {
96                "list" => core.db_list_records(namespace, iid, table, query).await,
97                "get" => core.db_get_record(namespace, iid, table, id.unwrap(), query).await,
98                "count" => core.db_count_records(namespace, iid, table, query).await,
99                "search" => core.db_search_records(namespace, iid, table, query).await,
100                _ => unreachable!("unknown core_get method: {}", method),
101            }
102        }
103        None => {
104            // Single-instance DB
105            match method {
106                "list" => core.db_single_list_records(namespace, table, query).await,
107                "get" => core.db_single_get_record(namespace, table, id.unwrap(), query).await,
108                "count" => core.db_single_count_records(namespace, table, query).await,
109                "search" => core.db_single_search_records(namespace, table, query).await,
110                _ => unreachable!("unknown core_get method: {}", method),
111            }
112        }
113    }
114}
115
116/// Dispatch an insert to the correct generated core method.
117async fn core_insert(
118    core: &GeneratedDbApi<'_>,
119    namespace: &str,
120    instance_id: Option<&str>,
121    table: &str,
122    body: &Value,
123    query: &HashMap<String, String>,
124) -> Result<Value, Error> {
125    match instance_id {
126        Some(iid) => core.db_insert_record(namespace, iid, table, body, query).await,
127        None => core.db_single_insert_record(namespace, table, body, query).await,
128    }
129}
130
131/// Dispatch an update to the correct generated core method.
132async fn core_update(
133    core: &GeneratedDbApi<'_>,
134    namespace: &str,
135    instance_id: Option<&str>,
136    table: &str,
137    id: &str,
138    body: &Value,
139) -> Result<Value, Error> {
140    match instance_id {
141        Some(iid) => core.db_update_record(namespace, iid, table, id, body).await,
142        None => core.db_single_update_record(namespace, table, id, body).await,
143    }
144}
145
146/// Dispatch a delete to the correct generated core method.
147async fn core_delete(
148    core: &GeneratedDbApi<'_>,
149    namespace: &str,
150    instance_id: Option<&str>,
151    table: &str,
152    id: &str,
153) -> Result<Value, Error> {
154    match instance_id {
155        Some(iid) => core.db_delete_record(namespace, iid, table, id).await,
156        None => core.db_single_delete_record(namespace, table, id).await,
157    }
158}
159
160/// Dispatch a batch insert to the correct generated core method.
161async fn core_batch(
162    core: &GeneratedDbApi<'_>,
163    namespace: &str,
164    instance_id: Option<&str>,
165    table: &str,
166    body: &Value,
167    query: &HashMap<String, String>,
168) -> Result<Value, Error> {
169    match instance_id {
170        Some(iid) => core.db_batch_records(namespace, iid, table, body, query).await,
171        None => core.db_single_batch_records(namespace, table, body, query).await,
172    }
173}
174
175/// Dispatch a batch-by-filter to the correct generated core method.
176async fn core_batch_by_filter(
177    core: &GeneratedDbApi<'_>,
178    namespace: &str,
179    instance_id: Option<&str>,
180    table: &str,
181    body: &Value,
182    query: &HashMap<String, String>,
183) -> Result<Value, Error> {
184    match instance_id {
185        Some(iid) => core.db_batch_by_filter(namespace, iid, table, body, query).await,
186        None => core.db_single_batch_by_filter(namespace, table, body, query).await,
187    }
188}
189
190/// Immutable query builder for a EdgeBase table.
191///
192/// ```rust,no_run,ignore,no_run
193/// # async fn example(client: &edgebase_core::EdgeBase) -> Result<(), edgebase_core::Error> {
194/// let result = client.table("posts")
195///     .where_("status", "==", "published")
196///     .or_(|q| q.where_("status", "==", "draft"))
197///     .order_by("createdAt", "desc")
198///     .limit(20)
199///     .get_list()
200///     .await?;
201/// # Ok(())
202/// # }
203/// ```
204#[derive(Clone)]
205pub struct TableRef {
206    http: Arc<HttpClient>,
207    name: String,
208    /// DB block namespace: 'shared' | 'workspace' | 'user' | ...
209    namespace: String,
210    /// DB instance ID for dynamic DOs (e.g. 'ws-456'). None for static DBs.
211    instance_id: Option<String>,
212    filters: Vec<FilterTuple>,
213    or_filters: Vec<FilterTuple>,
214    sorts: Vec<[String; 2]>,
215    limit_val: Option<u32>,
216    offset_val: Option<u32>,
217    page_val: Option<u32>,
218    search_val: Option<String>,
219    after_val: Option<String>,
220    before_val: Option<String>,
221}
222
223pub struct OrBuilder {
224    pub filters: Vec<FilterTuple>,
225}
226
227impl OrBuilder {
228    pub fn new() -> Self {
229        Self { filters: vec![] }
230    }
231
232    pub fn where_<V: Serialize>(mut self, field: &str, op: &str, value: V) -> Self {
233        self.filters.push((
234            field.into(),
235            op.into(),
236            serde_json::to_value(value).unwrap_or(Value::Null),
237        ));
238        self
239    }
240}
241
242impl TableRef {
243    pub fn new(http: Arc<HttpClient>, name: &str) -> Self {
244        Self::with_db(http, name, "shared", None)
245    }
246
247    /// Create a TableRef for a specific DB namespace + optional instance ID.
248    pub fn with_db(http: Arc<HttpClient>, name: &str, namespace: &str, instance_id: Option<&str>) -> Self {
249        Self {
250            http,
251            name: name.to_string(),
252            namespace: namespace.to_string(),
253            instance_id: instance_id.map(|s| s.to_string()),
254            filters: vec![],
255            or_filters: vec![],
256            sorts: vec![],
257            limit_val: None,
258            offset_val: None,
259            page_val: None,
260            search_val: None,
261            after_val: None,
262            before_val: None,
263        }
264    }
265
266    // ── Query Builder ────────────────────────────────────────────
267
268    pub fn where_<V: Serialize>(self, field: &str, op: &str, value: V) -> Self {
269        let mut c = self;
270        c.filters.push((
271            field.into(),
272            op.into(),
273            serde_json::to_value(value).unwrap_or(Value::Null),
274        ));
275        c
276    }
277
278    pub fn or_(self, builder_fn: impl FnOnce(OrBuilder) -> OrBuilder) -> Self {
279        let mut c = self;
280        let builder = builder_fn(OrBuilder::new());
281        c.or_filters.extend(builder.filters);
282        c
283    }
284
285    pub fn order_by(self, field: &str, direction: &str) -> Self {
286        let mut c = self;
287        c.sorts.push([field.into(), direction.into()]);
288        c
289    }
290
291    pub fn limit(mut self, n: u32) -> Self { self.limit_val = Some(n); self }
292    pub fn offset(mut self, n: u32) -> Self { self.offset_val = Some(n); self }
293    pub fn page(mut self, n: u32) -> Self { self.page_val = Some(n); self }
294    pub fn search(mut self, q: &str) -> Self { self.search_val = Some(q.into()); self }
295    pub fn after(mut self, cursor: &str) -> Self { self.after_val = Some(cursor.into()); self }
296    pub fn before(mut self, cursor: &str) -> Self { self.before_val = Some(cursor.into()); self }
297
298    /// Collection name.
299    pub fn name(&self) -> &str { &self.name }
300
301    /// Create a temporary GeneratedDbApi for dispatching calls.
302    fn core(&self) -> GeneratedDbApi<'_> {
303        GeneratedDbApi::new(&self.http)
304    }
305
306    fn validate_query_state(&self) -> Result<(), Error> {
307        let has_cursor = self.after_val.is_some() || self.before_val.is_some();
308        let has_offset = self.offset_val.is_some() || self.page_val.is_some();
309        if has_cursor && has_offset {
310            return Err(Error::Api {
311                status: 400,
312                message: "Cannot use page()/offset() with after()/before() — choose offset or cursor pagination".to_string(),
313            });
314        }
315        Ok(())
316    }
317
318    /// Instance ID as an Option<&str> for dispatch helpers.
319    fn iid(&self) -> Option<&str> {
320        self.instance_id.as_deref()
321    }
322
323    /// Build query parameters from current state as a HashMap.
324    fn build_query_params(&self) -> HashMap<String, String> {
325        let mut params = HashMap::new();
326        if !self.filters.is_empty() {
327            params.insert("filter".to_string(), serde_json::to_string(&self.filters).unwrap_or_default());
328        }
329        if !self.or_filters.is_empty() {
330            params.insert("orFilter".to_string(), serde_json::to_string(&self.or_filters).unwrap_or_default());
331        }
332        if !self.sorts.is_empty() {
333            let sort_str = self.sorts.iter()
334                .map(|s| format!("{}:{}", s[0], s[1]))
335                .collect::<Vec<_>>()
336                .join(",");
337            params.insert("sort".to_string(), sort_str);
338        }
339        if let Some(v) = self.limit_val   { params.insert("limit".to_string(), v.to_string()); }
340        if let Some(v) = self.offset_val  { params.insert("offset".to_string(), v.to_string()); }
341        if let Some(v) = self.page_val    { params.insert("page".to_string(), v.to_string()); }
342        if let Some(ref v) = self.search_val { params.insert("search".to_string(), v.clone()); }
343        if let Some(ref v) = self.after_val  { params.insert("after".to_string(), v.clone()); }
344        if let Some(ref v) = self.before_val { params.insert("before".to_string(), v.clone()); }
345        params
346    }
347
348    // ── CRUD ─────────────────────────────────────────────────────
349
350    /// List documents matching current filters/sorts.
351    pub async fn get_list(&self) -> Result<Value, Error> {
352        self.validate_query_state()?;
353        let query = self.build_query_params();
354        let core = self.core();
355        if self.search_val.is_some() {
356            core_get(&core, "search", &self.namespace, self.iid(), &self.name, None, &query).await
357        } else {
358            core_get(&core, "list", &self.namespace, self.iid(), &self.name, None, &query).await
359        }
360    }
361
362    /// Get the first record matching the current query conditions.
363    /// Returns `Ok(Value::Null)` if no records match.
364    pub async fn get_first(&self) -> Result<Value, Error> {
365        let result = self.clone().limit(1).get_list().await?;
366        let items = result.get("items").and_then(|i| i.as_array());
367        match items.and_then(|arr| arr.first()) {
368            Some(item) => Ok(item.clone()),
369            None => Ok(Value::Null),
370        }
371    }
372
373    /// Execute admin SQL scoped to this table's database namespace.
374    pub async fn sql(&self, query: &str, params: &[Value]) -> Result<Vec<Value>, Error> {
375        let mut body = json!({
376            "namespace": self.namespace,
377            "sql": query,
378            "params": params,
379        });
380        if let Some(instance_id) = self.iid() {
381            body["id"] = json!(instance_id);
382        }
383        let result = self.http.post("/api/sql", &body).await?;
384        Ok(result
385            .get("items")
386            .and_then(|items| items.as_array())
387            .cloned()
388            .unwrap_or_default())
389    }
390
391    /// Fetch a single document by ID.
392    pub async fn get_one(&self, id: &str) -> Result<Value, Error> {
393        let core = self.core();
394        let query = HashMap::new();
395        core_get(&core, "get", &self.namespace, self.iid(), &self.name, Some(id), &query).await
396    }
397
398    /// Create a new document.
399    pub async fn insert(&self, record: &Value) -> Result<Value, Error> {
400        let core = self.core();
401        let query = HashMap::new();
402        core_insert(&core, &self.namespace, self.iid(), &self.name, record, &query).await
403    }
404
405    /// Update a document by ID.
406    pub async fn update(&self, id: &str, data: &Value) -> Result<Value, Error> {
407        let core = self.core();
408        core_update(&core, &self.namespace, self.iid(), &self.name, id, data).await
409    }
410
411    /// Delete a document by ID.
412    pub async fn delete(&self, id: &str) -> Result<Value, Error> {
413        let core = self.core();
414        core_delete(&core, &self.namespace, self.iid(), &self.name, id).await
415    }
416
417    /// Create or update a document.
418    pub async fn upsert(&self, record: &Value, conflict_target: Option<&str>) -> Result<Value, Error> {
419        let core = self.core();
420        let mut query = HashMap::new();
421        query.insert("upsert".to_string(), "true".to_string());
422        if let Some(ct) = conflict_target {
423            query.insert("conflictTarget".to_string(), ct.to_string());
424        }
425        core_insert(&core, &self.namespace, self.iid(), &self.name, record, &query).await
426    }
427
428    /// Count documents matching current filters.
429    pub async fn count(&self) -> Result<u64, Error> {
430        self.validate_query_state()?;
431        let query = self.build_query_params();
432        let core = self.core();
433        let result = core_get(&core, "count", &self.namespace, self.iid(), &self.name, None, &query).await?;
434        Ok(result.get("total").and_then(|v| v.as_u64()).unwrap_or(0))
435    }
436
437    // ── Batch ────────────────────────────────────────────────────
438
439    /// Create multiple documents in server-side batches.
440    pub async fn insert_many(&self, records: Vec<Value>) -> Result<Value, Error> {
441        let core = self.core();
442        let query = HashMap::new();
443        if records.len() <= 500 {
444            let body = json!({ "inserts": records });
445            return core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await;
446        }
447
448        let mut inserted = Vec::new();
449        for chunk in records.chunks(500) {
450            let body = json!({ "inserts": chunk });
451            let result = core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await?;
452            inserted.extend(
453                result
454                    .get("inserted")
455                    .and_then(Value::as_array)
456                    .cloned()
457                    .unwrap_or_default(),
458            );
459        }
460        Ok(json!({ "inserted": inserted }))
461    }
462
463    /// Upsert multiple documents in server-side batches.
464    pub async fn upsert_many(&self, records: Vec<Value>, conflict_target: Option<&str>) -> Result<Value, Error> {
465        let core = self.core();
466        let mut query = HashMap::new();
467        query.insert("upsert".to_string(), "true".to_string());
468        if let Some(ct) = conflict_target {
469            query.insert("conflictTarget".to_string(), ct.to_string());
470        }
471        if records.len() <= 500 {
472            let body = json!({ "inserts": records });
473            return core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await;
474        }
475
476        let mut inserted = Vec::new();
477        for chunk in records.chunks(500) {
478            let body = json!({ "inserts": chunk });
479            let result = core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await?;
480            inserted.extend(
481                result
482                    .get("inserted")
483                    .and_then(Value::as_array)
484                    .cloned()
485                    .unwrap_or_default(),
486            );
487        }
488        Ok(json!({ "inserted": inserted }))
489    }
490
491    /// Update all documents matching current filters.
492    pub async fn update_many(&self, update: &Value) -> Result<Value, Error> {
493        if self.filters.is_empty() {
494            return Err(Error::Api {
495                status: 400,
496                message: "update_many requires at least one where() filter".to_string(),
497            });
498        }
499        let core = self.core();
500        let mut body = json!({
501            "action": "update",
502            "filter": self.filters,
503            "update": update
504        });
505        if !self.or_filters.is_empty() {
506            body.as_object_mut().unwrap().insert("orFilter".to_string(), json!(self.or_filters));
507        }
508        let query = HashMap::new();
509        core_batch_by_filter(&core, &self.namespace, self.iid(), &self.name, &body, &query).await
510    }
511
512    /// Delete all documents matching current filters.
513    pub async fn delete_many(&self) -> Result<Value, Error> {
514        if self.filters.is_empty() {
515            return Err(Error::Api {
516                status: 400,
517                message: "delete_many requires at least one where() filter".to_string(),
518            });
519        }
520        let core = self.core();
521        let mut body = json!({
522            "action": "delete",
523            "filter": self.filters
524        });
525        if !self.or_filters.is_empty() {
526            body.as_object_mut().unwrap().insert("orFilter".to_string(), json!(self.or_filters));
527        }
528        let query = HashMap::new();
529        core_batch_by_filter(&core, &self.namespace, self.iid(), &self.name, &body, &query).await
530    }
531
532    /// Returns a document-scoped helper for record operations by id.
533    pub fn doc(&self, id: &str) -> DocRef {
534        DocRef {
535            table: self.clone(),
536            id: id.to_string(),
537        }
538    }
539}
540
541#[derive(Clone)]
542pub struct DocRef {
543    table: TableRef,
544    id: String,
545}
546
547impl DocRef {
548    pub async fn get(&self) -> Result<Value, Error> {
549        self.table.get_one(&self.id).await
550    }
551
552    pub async fn update(&self, data: &Value) -> Result<Value, Error> {
553        self.table.update(&self.id, data).await
554    }
555
556    pub async fn delete(&self) -> Result<Value, Error> {
557        self.table.delete(&self.id).await
558    }
559}