Skip to main content

what_core/database/
d1.rs

1//! Cloudflare D1 database backend
2//!
3//! Supports two table schemas:
4//!   - **Real columns** (recommended): standard SQL tables with typed columns
5//!   - **JSON blob** (legacy): tables with `id INTEGER PRIMARY KEY, data TEXT`
6//!
7//! Auto-detects which mode to use per table. System tables:
8//!   - `_kv_store(key TEXT PRIMARY KEY, value TEXT)` — key-value pairs
9//!   - `_collections(name TEXT PRIMARY KEY)` — collection registry
10//!
11//! All operations go through the D1 HTTP REST API.
12
13use regex::Regex;
14use reqwest::Client;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17use std::collections::HashMap;
18use std::sync::LazyLock;
19use std::sync::{Arc, RwLock};
20
21use super::CollectionQuery;
22use crate::Result;
23
24/// Validate that an identifier (table name, field name) is safe for SQL interpolation.
25/// Only allows alphanumeric characters and underscores.
26static SAFE_IDENTIFIER_RE: LazyLock<Regex> =
27    LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]{0,127}$").unwrap());
28
29fn validate_identifier(name: &str) -> Result<()> {
30    if SAFE_IDENTIFIER_RE.is_match(name) {
31        Ok(())
32    } else {
33        Err(crate::Error::Data(format!(
34            "Invalid identifier: {:?}",
35            name
36        )))
37    }
38}
39
40/// Column info cached from PRAGMA table_info
41#[derive(Clone, Debug)]
42struct TableSchema {
43    /// Column names excluding `id` (for INSERT)
44    columns: Vec<String>,
45    /// Whether this table uses JSON blob mode (only `id` + `data` columns)
46    is_json_blob: bool,
47}
48
49/// Cloudflare D1 database client
50#[derive(Clone)]
51pub struct D1Database {
52    client: Client,
53    account_id: String,
54    database_id: String,
55    api_token: String,
56    /// Cached table schemas (populated on first access per table)
57    schema_cache: Arc<RwLock<HashMap<String, TableSchema>>>,
58}
59
60/// D1 API query response
61#[derive(Deserialize)]
62struct D1Response {
63    success: bool,
64    result: Option<Vec<D1QueryResult>>,
65    errors: Option<Vec<D1Error>>,
66}
67
68#[derive(Deserialize)]
69struct D1QueryResult {
70    results: Option<Vec<Value>>,
71}
72
73#[derive(Deserialize)]
74struct D1Error {
75    message: String,
76}
77
78/// Parameters for a D1 SQL query
79#[derive(Serialize)]
80struct D1Query {
81    sql: String,
82    #[serde(skip_serializing_if = "Vec::is_empty")]
83    params: Vec<Value>,
84}
85
86impl D1Database {
87    pub fn new(account_id: &str, database_id: &str, api_token: &str) -> Self {
88        Self {
89            client: Client::new(),
90            account_id: account_id.to_string(),
91            database_id: database_id.to_string(),
92            api_token: api_token.to_string(),
93            schema_cache: Arc::new(RwLock::new(HashMap::new())),
94        }
95    }
96
97    /// Initialize system tables required by What (_kv_store and _collections).
98    /// Safe to call multiple times — uses `CREATE TABLE IF NOT EXISTS`.
99    pub async fn init(&self) -> crate::Result<()> {
100        self.execute(
101            "CREATE TABLE IF NOT EXISTS _kv_store (key TEXT PRIMARY KEY, value TEXT NOT NULL)",
102            vec![],
103        )
104        .await?;
105        self.execute(
106            "CREATE TABLE IF NOT EXISTS _collections (name TEXT PRIMARY KEY)",
107            vec![],
108        )
109        .await?;
110        Ok(())
111    }
112
113    /// Execute a SQL query against D1 and return rows
114    async fn query(&self, sql: &str, params: Vec<Value>) -> Result<Vec<Value>> {
115        let url = format!(
116            "https://api.cloudflare.com/client/v4/accounts/{}/d1/database/{}/query",
117            self.account_id, self.database_id
118        );
119
120        let body = D1Query {
121            sql: sql.to_string(),
122            params,
123        };
124
125        let resp = self
126            .client
127            .post(&url)
128            .bearer_auth(&self.api_token)
129            .json(&body)
130            .send()
131            .await
132            .map_err(|e| crate::Error::Data(format!("D1 request failed: {}", e)))?;
133
134        let status = resp.status();
135        let text = resp
136            .text()
137            .await
138            .map_err(|e| crate::Error::Data(format!("D1 response read error: {}", e)))?;
139
140        let d1_resp: D1Response = serde_json::from_str(&text).map_err(|_| {
141            crate::Error::Data(format!("D1 response parse error (status {})", status))
142        })?;
143
144        if !d1_resp.success {
145            let msg = d1_resp
146                .errors
147                .and_then(|e| e.first().map(|err| err.message.clone()))
148                .unwrap_or_else(|| "unknown D1 error".to_string());
149            return Err(crate::Error::Data(format!("D1 error: {}", msg)));
150        }
151
152        Ok(d1_resp
153            .result
154            .and_then(|r| r.into_iter().next())
155            .and_then(|r| r.results)
156            .unwrap_or_default())
157    }
158
159    /// Execute a SQL statement (INSERT, UPDATE, DELETE, CREATE TABLE)
160    async fn execute(&self, sql: &str, params: Vec<Value>) -> Result<()> {
161        self.query(sql, params).await?;
162        Ok(())
163    }
164
165    // -----------------------------------------------------------------------
166    // Schema detection
167    // -----------------------------------------------------------------------
168
169    /// Get the schema for a table, caching the result.
170    async fn get_schema(&self, table: &str) -> Result<TableSchema> {
171        // Check cache first
172        {
173            let cache = self.schema_cache.read().unwrap_or_else(|e| e.into_inner());
174            if let Some(schema) = cache.get(table) {
175                return Ok(schema.clone());
176            }
177        }
178
179        // Query PRAGMA table_info
180        let rows = self
181            .query(&format!("PRAGMA table_info(\"{}\")", table), vec![])
182            .await?;
183
184        let columns: Vec<String> = rows
185            .iter()
186            .filter_map(|row| {
187                row.get("name")
188                    .and_then(|v| v.as_str())
189                    .map(|s| s.to_string())
190            })
191            .collect();
192
193        // JSON blob mode: exactly `id` + `data` columns
194        let is_json_blob = columns.len() == 2
195            && columns.contains(&"id".to_string())
196            && columns.contains(&"data".to_string());
197
198        let non_id_columns: Vec<String> = columns.into_iter().filter(|c| c != "id").collect();
199
200        let schema = TableSchema {
201            columns: non_id_columns,
202            is_json_blob,
203        };
204
205        // Cache it
206        {
207            let mut cache = self.schema_cache.write().unwrap_or_else(|e| e.into_inner());
208            cache.insert(table.to_string(), schema.clone());
209        }
210
211        Ok(schema)
212    }
213
214    // -----------------------------------------------------------------------
215    // Row conversion (auto-detects JSON blob vs real columns)
216    // -----------------------------------------------------------------------
217
218    /// Convert a D1 row to a JSON item.
219    /// - JSON blob mode: parse `data` column as JSON, merge `id`
220    /// - Real columns mode: return the row as-is
221    fn row_to_item_with_schema(row: &Value, schema: &TableSchema) -> Value {
222        if schema.is_json_blob {
223            // Legacy JSON blob: parse data column
224            let id = row.get("id").cloned().unwrap_or(json!(0));
225            let data_str = row.get("data").and_then(|v| v.as_str()).unwrap_or("{}");
226            let mut item: Value = serde_json::from_str(data_str).unwrap_or(json!({}));
227            if let Value::Object(ref mut map) = item {
228                map.insert("id".to_string(), id);
229            }
230            item
231        } else {
232            // Real columns: return row directly
233            row.clone()
234        }
235    }
236
237    // -----------------------------------------------------------------------
238    // Collection operations
239    // -----------------------------------------------------------------------
240
241    pub async fn get_collection(&self, name: &str) -> Result<Vec<Value>> {
242        validate_identifier(name)?;
243        let schema = self.get_schema(name).await?;
244        let rows = self
245            .query(&format!("SELECT * FROM \"{}\"", name), vec![])
246            .await?;
247
248        Ok(rows
249            .into_iter()
250            .map(|row| Self::row_to_item_with_schema(&row, &schema))
251            .collect())
252    }
253
254    pub async fn query_collection(
255        &self,
256        name: &str,
257        query: &CollectionQuery,
258    ) -> Result<Vec<Value>> {
259        validate_identifier(name)?;
260        let schema = self.get_schema(name).await?;
261        let mut sql = format!("SELECT * FROM \"{}\"", name);
262        let mut params: Vec<Value> = Vec::new();
263        let mut conditions: Vec<String> = Vec::new();
264
265        // Build filter SQL
266        if let Some(ref filter_expr) = query.filter {
267            if let Some(filter_sql) = build_d1_filter(filter_expr, &mut params, &schema) {
268                conditions.push(filter_sql);
269            }
270        }
271
272        // Policy-forced scope filters — AND-ed in, cannot be widened by the user.
273        // On real-column (non-blob) D1 tables the forced field may not exist as
274        // a column; build_d1_filter targets that column and the query returns
275        // nothing (fail closed).
276        for forced in &query.forced_filters {
277            if let Some(filter_sql) = build_d1_filter(forced, &mut params, &schema) {
278                conditions.push(filter_sql);
279            }
280        }
281
282        // Search — for real columns, search across all text columns
283        if let Some(ref search) = query.search {
284            if !search.is_empty() {
285                let idx = params.len() + 1;
286                params.push(json!(format!("%{}%", search)));
287                if schema.is_json_blob {
288                    conditions.push(format!("data LIKE ?{}", idx));
289                } else {
290                    // Search across all non-id columns
291                    let col_searches: Vec<String> = schema
292                        .columns
293                        .iter()
294                        .map(|col| format!("CAST(\"{}\" AS TEXT) LIKE ?{}", col, idx))
295                        .collect();
296                    if !col_searches.is_empty() {
297                        conditions.push(format!("({})", col_searches.join(" OR ")));
298                    }
299                }
300            }
301        }
302
303        if !conditions.is_empty() {
304            sql.push_str(" WHERE ");
305            sql.push_str(&conditions.join(" AND "));
306        }
307
308        // Sort
309        if let Some(ref sort) = query.sort {
310            let (field, desc) = if let Some((f, d)) = sort.rsplit_once(':') {
311                (f, d.eq_ignore_ascii_case("desc"))
312            } else {
313                (sort.as_str(), false)
314            };
315            validate_identifier(field)?;
316            if schema.is_json_blob {
317                sql.push_str(&format!(
318                    " ORDER BY json_extract(data, '$.{}') {}",
319                    field,
320                    if desc { "DESC" } else { "ASC" }
321                ));
322            } else {
323                sql.push_str(&format!(
324                    " ORDER BY \"{}\" {}",
325                    field,
326                    if desc { "DESC" } else { "ASC" }
327                ));
328            }
329        }
330
331        if let Some(limit) = query.limit {
332            sql.push_str(&format!(" LIMIT {}", limit));
333        }
334        if let Some(offset) = query.offset {
335            sql.push_str(&format!(" OFFSET {}", offset));
336        }
337
338        let rows = self.query(&sql, params).await?;
339        Ok(rows
340            .into_iter()
341            .map(|row| Self::row_to_item_with_schema(&row, &schema))
342            .collect())
343    }
344
345    pub async fn find_by(
346        &self,
347        collection: &str,
348        field: &str,
349        value: &Value,
350    ) -> Result<Vec<Value>> {
351        validate_identifier(collection)?;
352        validate_identifier(field)?;
353        let schema = self.get_schema(collection).await?;
354
355        let sql = if schema.is_json_blob {
356            format!(
357                "SELECT * FROM \"{}\" WHERE json_extract(data, '$.{}') = ?1",
358                collection, field
359            )
360        } else {
361            format!("SELECT * FROM \"{}\" WHERE \"{}\" = ?1", collection, field)
362        };
363
364        let rows = self.query(&sql, vec![value.clone()]).await?;
365        Ok(rows
366            .into_iter()
367            .map(|row| Self::row_to_item_with_schema(&row, &schema))
368            .collect())
369    }
370
371    pub async fn find_one_by(
372        &self,
373        collection: &str,
374        field: &str,
375        value: &Value,
376    ) -> Result<Option<Value>> {
377        validate_identifier(collection)?;
378        validate_identifier(field)?;
379        let schema = self.get_schema(collection).await?;
380
381        let sql = if schema.is_json_blob {
382            format!(
383                "SELECT * FROM \"{}\" WHERE json_extract(data, '$.{}') = ?1 LIMIT 1",
384                collection, field
385            )
386        } else {
387            format!(
388                "SELECT * FROM \"{}\" WHERE \"{}\" = ?1 LIMIT 1",
389                collection, field
390            )
391        };
392
393        let rows = self.query(&sql, vec![value.clone()]).await?;
394        Ok(rows
395            .into_iter()
396            .next()
397            .map(|row| Self::row_to_item_with_schema(&row, &schema)))
398    }
399
400    pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
401        validate_identifier(collection)?;
402        let schema = self.get_schema(collection).await?;
403
404        if schema.is_json_blob {
405            // JSON blob mode: store everything in data column
406            if let Value::Object(ref mut map) = item {
407                map.remove("id");
408            }
409            let data_str = serde_json::to_string(&item)?;
410
411            let rows = self
412                .query(
413                    &format!(
414                        "INSERT INTO \"{}\" (data) VALUES (?1) RETURNING id",
415                        collection
416                    ),
417                    vec![json!(data_str)],
418                )
419                .await?;
420
421            let id = rows
422                .first()
423                .and_then(|r| r.get("id"))
424                .cloned()
425                .unwrap_or(json!(0));
426
427            if let Value::Object(ref mut map) = item {
428                map.insert("id".to_string(), id);
429            }
430            Ok(item)
431        } else {
432            // Real columns mode: insert into individual columns
433            if let Value::Object(ref mut map) = item {
434                map.remove("id");
435            }
436
437            let mut col_names = Vec::new();
438            let mut placeholders = Vec::new();
439            let mut params = Vec::new();
440
441            if let Value::Object(ref map) = item {
442                let mut param_idx = 1;
443                for (key, val) in map.iter() {
444                    if schema.columns.contains(key) {
445                        col_names.push(format!("\"{}\"", key));
446                        placeholders.push(format!("?{}", param_idx));
447                        params.push(val.clone());
448                        param_idx += 1;
449                    }
450                }
451            }
452
453            let sql = format!(
454                "INSERT INTO \"{}\" ({}) VALUES ({}) RETURNING *",
455                collection,
456                col_names.join(", "),
457                placeholders.join(", ")
458            );
459
460            let rows = self.query(&sql, params).await?;
461            Ok(rows.into_iter().next().unwrap_or(item))
462        }
463    }
464
465    pub async fn update(
466        &self,
467        collection: &str,
468        id: &Value,
469        updates: Value,
470    ) -> Result<Option<Value>> {
471        validate_identifier(collection)?;
472        let schema = self.get_schema(collection).await?;
473
474        if schema.is_json_blob {
475            // JSON blob mode: read-merge-write
476            let rows = self
477                .query(
478                    &format!("SELECT * FROM \"{}\" WHERE id = ?1", collection),
479                    vec![id.clone()],
480                )
481                .await?;
482
483            let row = match rows.into_iter().next() {
484                Some(r) => r,
485                None => return Ok(None),
486            };
487
488            let mut current = Self::row_to_item_with_schema(&row, &schema);
489
490            if let (Value::Object(map), Value::Object(updates_map)) = (&mut current, &updates) {
491                for (k, v) in updates_map {
492                    map.insert(k.clone(), v.clone());
493                }
494            }
495
496            let mut data_for_storage = current.clone();
497            if let Value::Object(ref mut map) = data_for_storage {
498                map.remove("id");
499            }
500            let data_str = serde_json::to_string(&data_for_storage)?;
501
502            self.execute(
503                &format!("UPDATE \"{}\" SET data = ?1 WHERE id = ?2", collection),
504                vec![json!(data_str), id.clone()],
505            )
506            .await?;
507
508            Ok(Some(current))
509        } else {
510            // Real columns mode: SET individual columns
511            let mut set_clauses = Vec::new();
512            let mut params = Vec::new();
513            let mut idx = 1;
514
515            if let Value::Object(ref map) = updates {
516                for (key, val) in map {
517                    if key != "id" && schema.columns.contains(key) {
518                        set_clauses.push(format!("\"{}\" = ?{}", key, idx));
519                        params.push(val.clone());
520                        idx += 1;
521                    }
522                }
523            }
524
525            if set_clauses.is_empty() {
526                return Ok(None);
527            }
528
529            params.push(id.clone());
530            let sql = format!(
531                "UPDATE \"{}\" SET {} WHERE id = ?{} RETURNING *",
532                collection,
533                set_clauses.join(", "),
534                idx
535            );
536
537            let rows = self.query(&sql, params).await?;
538            Ok(rows.into_iter().next())
539        }
540    }
541
542    pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
543        validate_identifier(collection)?;
544        let rows = self
545            .query(
546                &format!("SELECT id FROM \"{}\" WHERE id = ?1", collection),
547                vec![id.clone()],
548            )
549            .await?;
550
551        if rows.is_empty() {
552            return Ok(false);
553        }
554
555        self.execute(
556            &format!("DELETE FROM \"{}\" WHERE id = ?1", collection),
557            vec![id.clone()],
558        )
559        .await?;
560
561        Ok(true)
562    }
563
564    // -----------------------------------------------------------------------
565    // Key-value operations
566    // -----------------------------------------------------------------------
567
568    pub async fn set(&self, key: &str, value: Value) -> Result<()> {
569        let value_str = serde_json::to_string(&value)?;
570        self.execute(
571            "INSERT OR REPLACE INTO _kv_store (key, value) VALUES (?1, ?2)",
572            vec![json!(key), json!(value_str)],
573        )
574        .await
575    }
576
577    pub async fn get(&self, key: &str) -> Result<Option<Value>> {
578        let rows = self
579            .query(
580                "SELECT value FROM _kv_store WHERE key = ?1",
581                vec![json!(key)],
582            )
583            .await?;
584
585        Ok(rows.into_iter().next().and_then(|row| {
586            row.get("value")
587                .and_then(|v| v.as_str())
588                .and_then(|s| serde_json::from_str(s).ok())
589        }))
590    }
591
592    pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
593        let existing = self.get(key).await?;
594        if existing.is_some() {
595            self.execute("DELETE FROM _kv_store WHERE key = ?1", vec![json!(key)])
596                .await?;
597        }
598        Ok(existing)
599    }
600
601    pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
602    where
603        F: FnOnce(Option<&Value>) -> Value,
604    {
605        let current = self.get(key).await?;
606        let new_value = f(current.as_ref());
607        self.set(key, new_value.clone()).await?;
608        Ok(new_value)
609    }
610
611    // -----------------------------------------------------------------------
612    // Context & bulk operations
613    // -----------------------------------------------------------------------
614
615    pub async fn as_context(&self) -> Result<HashMap<String, Value>> {
616        let mut context = HashMap::new();
617
618        let collections = self.query("SELECT name FROM _collections", vec![]).await?;
619
620        for row in collections {
621            if let Some(name) = row.get("name").and_then(|v| v.as_str()) {
622                if let Ok(items) = self.get_collection(name).await {
623                    context.insert(name.to_string(), json!(items));
624                }
625            }
626        }
627
628        let kv_rows = self
629            .query("SELECT key, value FROM _kv_store", vec![])
630            .await?;
631        for row in kv_rows {
632            if let (Some(key), Some(value_str)) = (
633                row.get("key").and_then(|v| v.as_str()),
634                row.get("value").and_then(|v| v.as_str()),
635            ) {
636                if let Ok(value) = serde_json::from_str::<Value>(value_str) {
637                    context.insert(key.to_string(), value);
638                }
639            }
640        }
641
642        Ok(context)
643    }
644
645    /// Clear the cached table schemas, forcing re-detection on next access.
646    /// Call this after `ALTER TABLE` or schema changes.
647    pub fn invalidate_schema_cache(&self) {
648        let mut cache = self.schema_cache.write().unwrap_or_else(|e| e.into_inner());
649        cache.clear();
650    }
651
652    pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
653        validate_identifier(name)?;
654        self.execute(&format!("DELETE FROM \"{}\"", name), vec![])
655            .await?;
656        for item in items {
657            self.create(name, item).await?;
658        }
659        Ok(())
660    }
661}
662
663/// Build a SQL WHERE clause from a filter expression (schema-aware)
664fn build_d1_filter(
665    filter_expr: &str,
666    params: &mut Vec<Value>,
667    schema: &TableSchema,
668) -> Option<String> {
669    let or_groups: Vec<&str> = filter_expr.split(',').collect();
670    let mut or_parts: Vec<String> = Vec::new();
671
672    for group in or_groups {
673        let and_conditions: Vec<&str> = group.split('&').collect();
674        let mut and_parts: Vec<String> = Vec::new();
675
676        for cond in and_conditions {
677            let cond = cond.trim();
678            if let Some(sql) = build_d1_condition(cond, params, schema) {
679                and_parts.push(sql);
680            }
681        }
682
683        if !and_parts.is_empty() {
684            or_parts.push(if and_parts.len() == 1 {
685                and_parts.into_iter().next().unwrap()
686            } else {
687                format!("({})", and_parts.join(" AND "))
688            });
689        }
690    }
691
692    if or_parts.is_empty() {
693        None
694    } else if or_parts.len() == 1 {
695        Some(or_parts.into_iter().next().unwrap())
696    } else {
697        Some(format!("({})", or_parts.join(" OR ")))
698    }
699}
700
701fn build_d1_condition(cond: &str, params: &mut Vec<Value>, schema: &TableSchema) -> Option<String> {
702    let operators = [">=", "<=", ">", "<", "="];
703    for op in operators {
704        if let Some((field, val)) = cond.split_once(op) {
705            let field = field.trim();
706            let val = val.trim();
707            if !SAFE_IDENTIFIER_RE.is_match(field) {
708                return None;
709            }
710            let idx = params.len() + 1;
711            params.push(json!(val));
712            if schema.is_json_blob {
713                return Some(format!("json_extract(data, '$.{}') {} ?{}", field, op, idx));
714            } else {
715                return Some(format!("\"{}\" {} ?{}", field, op, idx));
716            }
717        }
718    }
719    None
720}