Skip to main content

edgebase_core/
table.rs

1//! EdgeBase Rust SDK — TableRef (immutable query builder)
2//!
3//! All HTTP calls delegate to Generated Core (api_core.rs).
4//! No hardcoded API paths — the core is the single source of truth.
5
6use crate::{Error, http_client::HttpClient};
7use crate::generated::api_core::GeneratedDbApi;
8use serde::Serialize;
9use serde_json::{json, Value};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13type FilterTuple = (String, String, Value);
14
15/// Unified list query result.
16/// Returned by `TableRef::get_list()`.
17#[derive(Debug, Clone)]
18pub struct ListResult {
19    pub items: Vec<Value>,
20    pub total: Option<u64>,
21    pub page: Option<u64>,
22    pub per_page: Option<u64>,
23    pub has_more: Option<bool>,
24    pub cursor: Option<String>,
25}
26
27impl ListResult {
28    pub fn from_value(v: Value) -> Self {
29        let items = v.get("items")
30            .and_then(|i| i.as_array())
31            .cloned()
32            .unwrap_or_default();
33        Self {
34            items,
35            total: v.get("total").and_then(|t| t.as_u64()),
36            page: v.get("page").and_then(|p| p.as_u64()),
37            per_page: v.get("perPage").and_then(|p| p.as_u64()),
38            has_more: v.get("hasMore").and_then(|h| h.as_bool()),
39            cursor: v.get("cursor").and_then(|c| c.as_str()).map(|s| s.to_owned()),
40        }
41    }
42}
43
44/// Batch operation result.
45#[derive(Debug, Clone)]
46pub struct BatchResult {
47    pub total_processed: u64,
48    pub total_succeeded: u64,
49    pub errors: Vec<Value>,
50}
51
52impl BatchResult {
53    pub fn from_value(v: Value) -> Self {
54        Self {
55            total_processed: v.get("totalProcessed").and_then(|t| t.as_u64()).unwrap_or(0),
56            total_succeeded: v.get("totalSucceeded").and_then(|t| t.as_u64()).unwrap_or(0),
57            errors: v.get("errors")
58                .and_then(|e| e.as_array())
59                .cloned()
60                .unwrap_or_default(),
61        }
62    }
63}
64
65/// Upsert operation result.
66#[derive(Debug, Clone)]
67pub struct UpsertResult {
68    pub record: Value,
69    pub inserted: bool,
70}
71
72impl UpsertResult {
73    pub fn from_value(v: Value) -> Self {
74        let inserted = v.get("action").and_then(|a| a.as_str()) == Some("inserted");
75        Self { record: v, inserted }
76    }
77}
78
79// ── Core dispatch helpers ────────────────────────────────────────────
80// These mirror the JS SDK pattern: dispatch to single-instance vs dynamic based on instance_id.
81
82/// Dispatch a GET-style table operation (list, get, count, search) to the correct generated core method.
83async fn core_get(
84    core: &GeneratedDbApi<'_>,
85    method: &str,
86    namespace: &str,
87    instance_id: Option<&str>,
88    table: &str,
89    id: Option<&str>,
90    query: &HashMap<String, String>,
91) -> Result<Value, Error> {
92    match instance_id {
93        Some(iid) => {
94            // Dynamic DB
95            match method {
96                "list" => core.db_list_records(namespace, iid, table, query).await,
97                "get" => core.db_get_record(namespace, iid, table, id.unwrap(), query).await,
98                "count" => core.db_count_records(namespace, iid, table, query).await,
99                "search" => core.db_search_records(namespace, iid, table, query).await,
100                _ => unreachable!("unknown core_get method: {}", method),
101            }
102        }
103        None => {
104            // Single-instance DB
105            match method {
106                "list" => core.db_single_list_records(namespace, table, query).await,
107                "get" => core.db_single_get_record(namespace, table, id.unwrap(), query).await,
108                "count" => core.db_single_count_records(namespace, table, query).await,
109                "search" => core.db_single_search_records(namespace, table, query).await,
110                _ => unreachable!("unknown core_get method: {}", method),
111            }
112        }
113    }
114}
115
116/// Dispatch an insert to the correct generated core method.
117async fn core_insert(
118    core: &GeneratedDbApi<'_>,
119    namespace: &str,
120    instance_id: Option<&str>,
121    table: &str,
122    body: &Value,
123    query: &HashMap<String, String>,
124) -> Result<Value, Error> {
125    match instance_id {
126        Some(iid) => core.db_insert_record(namespace, iid, table, body, query).await,
127        None => core.db_single_insert_record(namespace, table, body, query).await,
128    }
129}
130
131/// Dispatch an update to the correct generated core method.
132async fn core_update(
133    core: &GeneratedDbApi<'_>,
134    namespace: &str,
135    instance_id: Option<&str>,
136    table: &str,
137    id: &str,
138    body: &Value,
139) -> Result<Value, Error> {
140    match instance_id {
141        Some(iid) => core.db_update_record(namespace, iid, table, id, body).await,
142        None => core.db_single_update_record(namespace, table, id, body).await,
143    }
144}
145
146/// Dispatch a delete to the correct generated core method.
147async fn core_delete(
148    core: &GeneratedDbApi<'_>,
149    namespace: &str,
150    instance_id: Option<&str>,
151    table: &str,
152    id: &str,
153) -> Result<Value, Error> {
154    match instance_id {
155        Some(iid) => core.db_delete_record(namespace, iid, table, id).await,
156        None => core.db_single_delete_record(namespace, table, id).await,
157    }
158}
159
160/// Dispatch a batch insert to the correct generated core method.
161async fn core_batch(
162    core: &GeneratedDbApi<'_>,
163    namespace: &str,
164    instance_id: Option<&str>,
165    table: &str,
166    body: &Value,
167    query: &HashMap<String, String>,
168) -> Result<Value, Error> {
169    match instance_id {
170        Some(iid) => core.db_batch_records(namespace, iid, table, body, query).await,
171        None => core.db_single_batch_records(namespace, table, body, query).await,
172    }
173}
174
175/// Dispatch a batch-by-filter to the correct generated core method.
176async fn core_batch_by_filter(
177    core: &GeneratedDbApi<'_>,
178    namespace: &str,
179    instance_id: Option<&str>,
180    table: &str,
181    body: &Value,
182    query: &HashMap<String, String>,
183) -> Result<Value, Error> {
184    match instance_id {
185        Some(iid) => core.db_batch_by_filter(namespace, iid, table, body, query).await,
186        None => core.db_single_batch_by_filter(namespace, table, body, query).await,
187    }
188}
189
190/// Immutable query builder for a EdgeBase table.
191///
192/// ```rust,no_run,ignore,no_run
193/// # async fn example(client: &edgebase_core::EdgeBase) -> Result<(), edgebase_core::Error> {
194/// let result = client.table("posts")
195///     .where_("status", "==", "published")
196///     .or_(|q| q.where_("status", "==", "draft"))
197///     .order_by("createdAt", "desc")
198///     .limit(20)
199///     .get_list()
200///     .await?;
201/// # Ok(())
202/// # }
203/// ```
204#[derive(Clone)]
205pub struct TableRef {
206    http: Arc<HttpClient>,
207    name: String,
208    /// DB block namespace: 'shared' | 'workspace' | 'user' | ...
209    namespace: String,
210    /// DB instance ID for dynamic DOs (e.g. 'ws-456'). None for static DBs.
211    instance_id: Option<String>,
212    filters: Vec<FilterTuple>,
213    or_filters: Vec<FilterTuple>,
214    sorts: Vec<[String; 2]>,
215    limit_val: Option<u32>,
216    offset_val: Option<u32>,
217    page_val: Option<u32>,
218    search_val: Option<String>,
219    after_val: Option<String>,
220    before_val: Option<String>,
221}
222
223pub struct OrBuilder {
224    pub filters: Vec<FilterTuple>,
225}
226
227impl OrBuilder {
228    pub fn new() -> Self {
229        Self { filters: vec![] }
230    }
231
232    pub fn where_<V: Serialize>(mut self, field: &str, op: &str, value: V) -> Self {
233        self.filters.push((
234            field.into(),
235            op.into(),
236            serde_json::to_value(value).unwrap_or(Value::Null),
237        ));
238        self
239    }
240}
241
242impl TableRef {
243    pub fn new(http: Arc<HttpClient>, name: &str) -> Self {
244        Self::with_db(http, name, "shared", None)
245    }
246
247    /// Create a TableRef for a specific DB namespace + optional instance ID.
248    pub fn with_db(http: Arc<HttpClient>, name: &str, namespace: &str, instance_id: Option<&str>) -> Self {
249        Self {
250            http,
251            name: name.to_string(),
252            namespace: namespace.to_string(),
253            instance_id: instance_id.map(|s| s.to_string()),
254            filters: vec![],
255            or_filters: vec![],
256            sorts: vec![],
257            limit_val: None,
258            offset_val: None,
259            page_val: None,
260            search_val: None,
261            after_val: None,
262            before_val: None,
263        }
264    }
265
266    // ── Query Builder ────────────────────────────────────────────
267
268    pub fn where_<V: Serialize>(self, field: &str, op: &str, value: V) -> Self {
269        let mut c = self;
270        c.filters.push((
271            field.into(),
272            op.into(),
273            serde_json::to_value(value).unwrap_or(Value::Null),
274        ));
275        c
276    }
277
278    pub fn or_(self, builder_fn: impl FnOnce(OrBuilder) -> OrBuilder) -> Self {
279        let mut c = self;
280        let builder = builder_fn(OrBuilder::new());
281        c.or_filters.extend(builder.filters);
282        c
283    }
284
285    pub fn order_by(self, field: &str, direction: &str) -> Self {
286        let mut c = self;
287        c.sorts.push([field.into(), direction.into()]);
288        c
289    }
290
291    pub fn limit(mut self, n: u32) -> Self { self.limit_val = Some(n); self }
292    pub fn offset(mut self, n: u32) -> Self { self.offset_val = Some(n); self }
293    pub fn page(mut self, n: u32) -> Self { self.page_val = Some(n); self }
294    pub fn search(mut self, q: &str) -> Self { self.search_val = Some(q.into()); self }
295    pub fn after(mut self, cursor: &str) -> Self { self.after_val = Some(cursor.into()); self }
296    pub fn before(mut self, cursor: &str) -> Self { self.before_val = Some(cursor.into()); self }
297
298    /// Collection name.
299    pub fn name(&self) -> &str { &self.name }
300
301    /// Create a temporary GeneratedDbApi for dispatching calls.
302    fn core(&self) -> GeneratedDbApi<'_> {
303        GeneratedDbApi::new(&self.http)
304    }
305
306    fn validate_query_state(&self) -> Result<(), Error> {
307        let has_cursor = self.after_val.is_some() || self.before_val.is_some();
308        let has_offset = self.offset_val.is_some() || self.page_val.is_some();
309        if has_cursor && has_offset {
310            return Err(Error::Api {
311                status: 400,
312                message: "Cannot use page()/offset() with after()/before() — choose offset or cursor pagination".to_string(),
313            });
314        }
315        Ok(())
316    }
317
318    /// Instance ID as an Option<&str> for dispatch helpers.
319    fn iid(&self) -> Option<&str> {
320        self.instance_id.as_deref()
321    }
322
323    /// Build query parameters from current state as a HashMap.
324    fn build_query_params(&self) -> HashMap<String, String> {
325        let mut params = HashMap::new();
326        if !self.filters.is_empty() {
327            params.insert("filter".to_string(), serde_json::to_string(&self.filters).unwrap_or_default());
328        }
329        if !self.or_filters.is_empty() {
330            params.insert("orFilter".to_string(), serde_json::to_string(&self.or_filters).unwrap_or_default());
331        }
332        if !self.sorts.is_empty() {
333            let sort_str = self.sorts.iter()
334                .map(|s| format!("{}:{}", s[0], s[1]))
335                .collect::<Vec<_>>()
336                .join(",");
337            params.insert("sort".to_string(), sort_str);
338        }
339        if let Some(v) = self.limit_val   { params.insert("limit".to_string(), v.to_string()); }
340        if let Some(v) = self.offset_val  { params.insert("offset".to_string(), v.to_string()); }
341        if let Some(v) = self.page_val    { params.insert("page".to_string(), v.to_string()); }
342        if let Some(ref v) = self.search_val { params.insert("search".to_string(), v.clone()); }
343        if let Some(ref v) = self.after_val  { params.insert("after".to_string(), v.clone()); }
344        if let Some(ref v) = self.before_val { params.insert("before".to_string(), v.clone()); }
345        params
346    }
347
348    // ── CRUD ─────────────────────────────────────────────────────
349
350    /// List documents matching current filters/sorts.
351    pub async fn get_list(&self) -> Result<Value, Error> {
352        self.validate_query_state()?;
353        let query = self.build_query_params();
354        let core = self.core();
355        if self.search_val.is_some() {
356            core_get(&core, "search", &self.namespace, self.iid(), &self.name, None, &query).await
357        } else {
358            core_get(&core, "list", &self.namespace, self.iid(), &self.name, None, &query).await
359        }
360    }
361
362    /// Alias for get_list() to match SDK parity across runtimes.
363    pub async fn get(&self) -> Result<Value, Error> {
364        self.get_list().await
365    }
366
367    /// Get the first record matching the current query conditions.
368    /// Returns `Ok(Value::Null)` if no records match.
369    pub async fn get_first(&self) -> Result<Value, Error> {
370        let result = self.clone().limit(1).get_list().await?;
371        let items = result.get("items").and_then(|i| i.as_array());
372        match items.and_then(|arr| arr.first()) {
373            Some(item) => Ok(item.clone()),
374            None => Ok(Value::Null),
375        }
376    }
377
378    /// Execute admin SQL scoped to this table's database namespace.
379    pub async fn sql(&self, query: &str, params: &[Value]) -> Result<Vec<Value>, Error> {
380        let mut body = json!({
381            "namespace": self.namespace,
382            "sql": query,
383            "params": params,
384        });
385        if let Some(instance_id) = self.iid() {
386            body["id"] = json!(instance_id);
387        }
388        let result = self.http.post("/api/sql", &body).await?;
389        Ok(result
390            .get("items")
391            .and_then(|items| items.as_array())
392            .cloned()
393            .unwrap_or_default())
394    }
395
396    /// Fetch a single document by ID.
397    pub async fn get_one(&self, id: &str) -> Result<Value, Error> {
398        let core = self.core();
399        let query = HashMap::new();
400        core_get(&core, "get", &self.namespace, self.iid(), &self.name, Some(id), &query).await
401    }
402
403    /// Create a new document.
404    pub async fn insert(&self, record: &Value) -> Result<Value, Error> {
405        let core = self.core();
406        let query = HashMap::new();
407        core_insert(&core, &self.namespace, self.iid(), &self.name, record, &query).await
408    }
409
410    /// Update a document by ID.
411    pub async fn update(&self, id: &str, data: &Value) -> Result<Value, Error> {
412        let core = self.core();
413        core_update(&core, &self.namespace, self.iid(), &self.name, id, data).await
414    }
415
416    /// Delete a document by ID.
417    pub async fn delete(&self, id: &str) -> Result<Value, Error> {
418        let core = self.core();
419        core_delete(&core, &self.namespace, self.iid(), &self.name, id).await
420    }
421
422    /// Create or update a document.
423    pub async fn upsert(&self, record: &Value, conflict_target: Option<&str>) -> Result<Value, Error> {
424        let core = self.core();
425        let mut query = HashMap::new();
426        query.insert("upsert".to_string(), "true".to_string());
427        if let Some(ct) = conflict_target {
428            query.insert("conflictTarget".to_string(), ct.to_string());
429        }
430        core_insert(&core, &self.namespace, self.iid(), &self.name, record, &query).await
431    }
432
433    /// Count documents matching current filters.
434    pub async fn count(&self) -> Result<u64, Error> {
435        self.validate_query_state()?;
436        let query = self.build_query_params();
437        let core = self.core();
438        let result = core_get(&core, "count", &self.namespace, self.iid(), &self.name, None, &query).await?;
439        Ok(result.get("total").and_then(|v| v.as_u64()).unwrap_or(0))
440    }
441
442    // ── Batch ────────────────────────────────────────────────────
443
444    /// Create multiple documents in server-side batches.
445    pub async fn insert_many(&self, records: Vec<Value>) -> Result<Value, Error> {
446        let core = self.core();
447        let query = HashMap::new();
448        if records.len() <= 500 {
449            let body = json!({ "inserts": records });
450            return core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await;
451        }
452
453        let mut inserted = Vec::new();
454        for chunk in records.chunks(500) {
455            let body = json!({ "inserts": chunk });
456            let result = core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await?;
457            inserted.extend(
458                result
459                    .get("inserted")
460                    .and_then(Value::as_array)
461                    .cloned()
462                    .unwrap_or_default(),
463            );
464        }
465        Ok(json!({ "inserted": inserted }))
466    }
467
468    /// Upsert multiple documents in server-side batches.
469    pub async fn upsert_many(&self, records: Vec<Value>, conflict_target: Option<&str>) -> Result<Value, Error> {
470        let core = self.core();
471        let mut query = HashMap::new();
472        query.insert("upsert".to_string(), "true".to_string());
473        if let Some(ct) = conflict_target {
474            query.insert("conflictTarget".to_string(), ct.to_string());
475        }
476        if records.len() <= 500 {
477            let body = json!({ "inserts": records });
478            return core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await;
479        }
480
481        let mut inserted = Vec::new();
482        for chunk in records.chunks(500) {
483            let body = json!({ "inserts": chunk });
484            let result = core_batch(&core, &self.namespace, self.iid(), &self.name, &body, &query).await?;
485            inserted.extend(
486                result
487                    .get("inserted")
488                    .and_then(Value::as_array)
489                    .cloned()
490                    .unwrap_or_default(),
491            );
492        }
493        Ok(json!({ "inserted": inserted }))
494    }
495
496    /// Update all documents matching current filters.
497    pub async fn update_many(&self, update: &Value) -> Result<Value, Error> {
498        if self.filters.is_empty() {
499            return Err(Error::Api {
500                status: 400,
501                message: "update_many requires at least one where() filter".to_string(),
502            });
503        }
504        let core = self.core();
505        let mut body = json!({
506            "action": "update",
507            "filter": self.filters,
508            "update": update
509        });
510        if !self.or_filters.is_empty() {
511            body.as_object_mut().unwrap().insert("orFilter".to_string(), json!(self.or_filters));
512        }
513        let query = HashMap::new();
514        core_batch_by_filter(&core, &self.namespace, self.iid(), &self.name, &body, &query).await
515    }
516
517    /// Delete all documents matching current filters.
518    pub async fn delete_many(&self) -> Result<Value, Error> {
519        if self.filters.is_empty() {
520            return Err(Error::Api {
521                status: 400,
522                message: "delete_many requires at least one where() filter".to_string(),
523            });
524        }
525        let core = self.core();
526        let mut body = json!({
527            "action": "delete",
528            "filter": self.filters
529        });
530        if !self.or_filters.is_empty() {
531            body.as_object_mut().unwrap().insert("orFilter".to_string(), json!(self.or_filters));
532        }
533        let query = HashMap::new();
534        core_batch_by_filter(&core, &self.namespace, self.iid(), &self.name, &body, &query).await
535    }
536
537    /// Returns a document-scoped helper for record operations by id.
538    pub fn doc(&self, id: &str) -> DocRef {
539        DocRef {
540            table: self.clone(),
541            id: id.to_string(),
542        }
543    }
544}
545
546#[derive(Clone)]
547pub struct DocRef {
548    table: TableRef,
549    id: String,
550}
551
552impl DocRef {
553    pub async fn get(&self) -> Result<Value, Error> {
554        self.table.get_one(&self.id).await
555    }
556
557    pub async fn update(&self, data: &Value) -> Result<Value, Error> {
558        self.table.update(&self.id, data).await
559    }
560
561    pub async fn delete(&self) -> Result<Value, Error> {
562        self.table.delete(&self.id).await
563    }
564}