Skip to main content

pylon_workers/
d1_store.rs

1//! [`DataStore`] implementation backed by Cloudflare D1.
2//!
3//! D1 speaks SQLite SQL, so the SQL generation here mirrors
4//! `pylon-storage::sqlite`. The `D1Executor` trait abstracts the actual
5//! execution layer so this module stays free of `worker` crate dependencies.
6//! The Workers fetch handler provides a concrete `D1Executor` that delegates
7//! to the real D1 bindings.
8
9use pylon_http::{DataError, DataStore};
10use pylon_kernel::{AppManifest, ManifestEntity};
11use serde_json::Value;
12
13// ---------------------------------------------------------------------------
14// Executor trait — abstracts D1 vs tests
15// ---------------------------------------------------------------------------
16
17/// Executes prepared SQL statements against D1.
18///
19/// Implementations bridge synchronous [`DataStore`] calls to D1's async API
20/// by blocking in the Workers request context (which is single-threaded).
21pub trait D1Executor: Send + Sync {
22    /// Execute a statement that doesn't return rows. Returns affected row count.
23    fn execute(&self, sql: &str, params: &[Value]) -> Result<u64, String>;
24
25    /// Execute a query and return all matching rows as JSON objects.
26    fn query(&self, sql: &str, params: &[Value]) -> Result<Vec<Value>, String>;
27
28    /// Execute a query expecting at most one row.
29    fn query_one(&self, sql: &str, params: &[Value]) -> Result<Option<Value>, String> {
30        let rows = self.query(sql, params)?;
31        Ok(rows.into_iter().next())
32    }
33}
34
35// ---------------------------------------------------------------------------
36// D1DataStore
37// ---------------------------------------------------------------------------
38
39pub struct D1DataStore<E: D1Executor> {
40    executor: E,
41    manifest: AppManifest,
42}
43
44impl<E: D1Executor> D1DataStore<E> {
45    pub fn new(executor: E, manifest: AppManifest) -> Self {
46        Self { executor, manifest }
47    }
48
49    fn entity(&self, name: &str) -> Result<&ManifestEntity, DataError> {
50        self.manifest
51            .entities
52            .iter()
53            .find(|e| e.name == name)
54            .ok_or_else(|| DataError {
55                code: "ENTITY_NOT_FOUND".into(),
56                message: format!("Unknown entity: \"{name}\""),
57            })
58    }
59
60    fn validate_column(&self, entity: &ManifestEntity, col: &str) -> Result<(), DataError> {
61        if col == "id" || entity.fields.iter().any(|f| f.name == col) {
62            Ok(())
63        } else {
64            Err(DataError {
65                code: "INVALID_COLUMN".into(),
66                message: format!("Unknown column \"{col}\" on entity \"{}\"", entity.name),
67            })
68        }
69    }
70}
71
72use pylon_kernel::util::quote_ident;
73
74fn generate_id() -> String {
75    // Simple time-based ID. D1 runs in isolates with precise time.
76    let now = std::time::SystemTime::now()
77        .duration_since(std::time::UNIX_EPOCH)
78        .unwrap_or_default()
79        .as_nanos();
80    format!("{:x}", now)
81}
82
83// ---------------------------------------------------------------------------
84// DataStore impl
85// ---------------------------------------------------------------------------
86
87impl<E: D1Executor> DataStore for D1DataStore<E> {
88    fn manifest(&self) -> &AppManifest {
89        &self.manifest
90    }
91
92    fn insert(&self, entity: &str, data: &Value) -> Result<String, DataError> {
93        let ent = self.entity(entity)?;
94        let obj = data.as_object().ok_or_else(|| DataError {
95            code: "INVALID_DATA".into(),
96            message: "Insert data must be a JSON object".into(),
97        })?;
98
99        let id = generate_id();
100        let mut cols = vec![quote_ident("id")];
101        let mut placeholders = vec!["?1".to_string()];
102        let mut params: Vec<Value> = vec![Value::String(id.clone())];
103        let mut idx = 2;
104
105        for (k, v) in obj {
106            if k == "id" {
107                continue;
108            }
109            self.validate_column(ent, k)?;
110            cols.push(quote_ident(k));
111            placeholders.push(format!("?{idx}"));
112            params.push(v.clone());
113            idx += 1;
114        }
115
116        let sql = format!(
117            "INSERT INTO {} ({}) VALUES ({})",
118            quote_ident(entity),
119            cols.join(", "),
120            placeholders.join(", ")
121        );
122
123        self.executor
124            .execute(&sql, &params)
125            .map_err(|e| DataError {
126                code: "INSERT_FAILED".into(),
127                message: e,
128            })?;
129
130        Ok(id)
131    }
132
133    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<Value>, DataError> {
134        let _ = self.entity(entity)?;
135        let sql = format!(
136            "SELECT * FROM {} WHERE \"id\" = ?1 LIMIT 1",
137            quote_ident(entity)
138        );
139        self.executor
140            .query_one(&sql, &[Value::String(id.to_string())])
141            .map_err(|e| DataError {
142                code: "QUERY_FAILED".into(),
143                message: e,
144            })
145    }
146
147    fn list(&self, entity: &str) -> Result<Vec<Value>, DataError> {
148        let _ = self.entity(entity)?;
149        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
150        self.executor.query(&sql, &[]).map_err(|e| DataError {
151            code: "QUERY_FAILED".into(),
152            message: e,
153        })
154    }
155
156    fn list_after(
157        &self,
158        entity: &str,
159        after: Option<&str>,
160        limit: usize,
161    ) -> Result<Vec<Value>, DataError> {
162        let _ = self.entity(entity)?;
163        let (sql, params): (String, Vec<Value>) = match after {
164            Some(cursor) => (
165                format!(
166                    "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
167                    quote_ident(entity)
168                ),
169                vec![
170                    Value::String(cursor.to_string()),
171                    Value::Number((limit as u64).into()),
172                ],
173            ),
174            None => (
175                format!(
176                    "SELECT * FROM {} ORDER BY \"id\" LIMIT ?1",
177                    quote_ident(entity)
178                ),
179                vec![Value::Number((limit as u64).into())],
180            ),
181        };
182
183        self.executor.query(&sql, &params).map_err(|e| DataError {
184            code: "QUERY_FAILED".into(),
185            message: e,
186        })
187    }
188
189    fn update(&self, entity: &str, id: &str, data: &Value) -> Result<bool, DataError> {
190        let ent = self.entity(entity)?;
191        let obj = data.as_object().ok_or_else(|| DataError {
192            code: "INVALID_DATA".into(),
193            message: "Update data must be a JSON object".into(),
194        })?;
195
196        let mut sets = Vec::new();
197        let mut params: Vec<Value> = Vec::new();
198        let mut idx = 1;
199        for (k, v) in obj {
200            if k == "id" {
201                continue;
202            }
203            self.validate_column(ent, k)?;
204            sets.push(format!("{} = ?{idx}", quote_ident(k)));
205            params.push(v.clone());
206            idx += 1;
207        }
208        if sets.is_empty() {
209            return Ok(false);
210        }
211        params.push(Value::String(id.to_string()));
212        let sql = format!(
213            "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
214            quote_ident(entity),
215            sets.join(", ")
216        );
217        let affected = self
218            .executor
219            .execute(&sql, &params)
220            .map_err(|e| DataError {
221                code: "UPDATE_FAILED".into(),
222                message: e,
223            })?;
224        Ok(affected > 0)
225    }
226
227    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
228        let _ = self.entity(entity)?;
229        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
230        let affected = self
231            .executor
232            .execute(&sql, &[Value::String(id.to_string())])
233            .map_err(|e| DataError {
234                code: "DELETE_FAILED".into(),
235                message: e,
236            })?;
237        Ok(affected > 0)
238    }
239
240    fn lookup(&self, entity: &str, field: &str, value: &str) -> Result<Option<Value>, DataError> {
241        let ent = self.entity(entity)?;
242        self.validate_column(ent, field)?;
243        let sql = format!(
244            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
245            quote_ident(entity),
246            quote_ident(field)
247        );
248        self.executor
249            .query_one(&sql, &[Value::String(value.to_string())])
250            .map_err(|e| DataError {
251                code: "QUERY_FAILED".into(),
252                message: e,
253            })
254    }
255
256    fn link(
257        &self,
258        entity: &str,
259        id: &str,
260        relation: &str,
261        target_id: &str,
262    ) -> Result<bool, DataError> {
263        let ent = self.entity(entity)?;
264        let rel = ent
265            .relations
266            .iter()
267            .find(|r| r.name == relation)
268            .ok_or_else(|| DataError {
269                code: "RELATION_NOT_FOUND".into(),
270                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
271            })?;
272        let data = serde_json::json!({ rel.field.clone(): target_id });
273        self.update(entity, id, &data)
274    }
275
276    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
277        let ent = self.entity(entity)?;
278        let rel = ent
279            .relations
280            .iter()
281            .find(|r| r.name == relation)
282            .ok_or_else(|| DataError {
283                code: "RELATION_NOT_FOUND".into(),
284                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
285            })?;
286        let data = serde_json::json!({ rel.field.clone(): Value::Null });
287        self.update(entity, id, &data)
288    }
289
290    fn query_filtered(&self, entity: &str, filter: &Value) -> Result<Vec<Value>, DataError> {
291        let ent = self.entity(entity)?;
292        let empty = serde_json::Map::new();
293        let obj = filter.as_object().unwrap_or(&empty);
294
295        let mut where_clauses: Vec<String> = Vec::new();
296        let mut params: Vec<Value> = Vec::new();
297        let mut order_clause = String::new();
298        let mut limit_clause = String::new();
299        let mut idx = 1;
300
301        for (k, v) in obj {
302            match k.as_str() {
303                "$order" => {
304                    if let Some(o) = v.as_object() {
305                        let mut parts = Vec::new();
306                        for (col, dir) in o {
307                            self.validate_column(ent, col)?;
308                            let d = match dir.as_str().unwrap_or("asc") {
309                                "desc" | "DESC" => "DESC",
310                                _ => "ASC",
311                            };
312                            parts.push(format!("{} {d}", quote_ident(col)));
313                        }
314                        if !parts.is_empty() {
315                            order_clause = format!(" ORDER BY {}", parts.join(", "));
316                        }
317                    }
318                }
319                "$limit" => {
320                    if let Some(n) = v.as_u64() {
321                        limit_clause = format!(" LIMIT {n}");
322                    }
323                }
324                _ => {
325                    self.validate_column(ent, k)?;
326                    let qk = quote_ident(k);
327                    if let Some(op_obj) = v.as_object() {
328                        for (op, op_val) in op_obj {
329                            match op.as_str() {
330                                "$not" => {
331                                    where_clauses.push(format!("{qk} != ?{idx}"));
332                                    params.push(op_val.clone());
333                                    idx += 1;
334                                }
335                                "$gt" => {
336                                    where_clauses.push(format!("{qk} > ?{idx}"));
337                                    params.push(op_val.clone());
338                                    idx += 1;
339                                }
340                                "$gte" => {
341                                    where_clauses.push(format!("{qk} >= ?{idx}"));
342                                    params.push(op_val.clone());
343                                    idx += 1;
344                                }
345                                "$lt" => {
346                                    where_clauses.push(format!("{qk} < ?{idx}"));
347                                    params.push(op_val.clone());
348                                    idx += 1;
349                                }
350                                "$lte" => {
351                                    where_clauses.push(format!("{qk} <= ?{idx}"));
352                                    params.push(op_val.clone());
353                                    idx += 1;
354                                }
355                                "$like" => {
356                                    where_clauses.push(format!("{qk} LIKE ?{idx}"));
357                                    let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
358                                    params.push(Value::String(pattern));
359                                    idx += 1;
360                                }
361                                "$in" => {
362                                    if let Some(arr) = op_val.as_array() {
363                                        let ph: Vec<String> = arr
364                                            .iter()
365                                            .map(|v| {
366                                                let p = format!("?{idx}");
367                                                params.push(v.clone());
368                                                idx += 1;
369                                                p
370                                            })
371                                            .collect();
372                                        if !ph.is_empty() {
373                                            where_clauses
374                                                .push(format!("{qk} IN ({})", ph.join(", ")));
375                                        }
376                                    }
377                                }
378                                _ => {}
379                            }
380                        }
381                    } else {
382                        where_clauses.push(format!("{qk} = ?{idx}"));
383                        params.push(v.clone());
384                        idx += 1;
385                    }
386                }
387            }
388        }
389
390        let where_sql = if where_clauses.is_empty() {
391            String::new()
392        } else {
393            format!(" WHERE {}", where_clauses.join(" AND "))
394        };
395        if order_clause.is_empty() {
396            order_clause = " ORDER BY \"id\"".into();
397        }
398
399        let sql = format!(
400            "SELECT * FROM {}{}{}{}",
401            quote_ident(entity),
402            where_sql,
403            order_clause,
404            limit_clause
405        );
406
407        self.executor.query(&sql, &params).map_err(|e| DataError {
408            code: "QUERY_FAILED".into(),
409            message: e,
410        })
411    }
412
413    fn query_graph(&self, query: &Value) -> Result<Value, DataError> {
414        let obj = query.as_object().ok_or_else(|| DataError {
415            code: "INVALID_QUERY".into(),
416            message: "Graph query must be a JSON object".into(),
417        })?;
418        let mut results = serde_json::Map::new();
419        for (entity_name, opts) in obj {
420            let filter = opts.get("where").cloned().unwrap_or(serde_json::json!({}));
421            let rows = self.query_filtered(entity_name, &filter)?;
422            results.insert(entity_name.clone(), Value::Array(rows));
423        }
424        Ok(Value::Object(results))
425    }
426
427    fn transact(&self, ops: &[Value]) -> Result<(bool, Vec<Value>), DataError> {
428        // D1 doesn't have real transactions from the Worker API — it has
429        // a batch API that runs in a single trip. For now, execute ops
430        // sequentially and short-circuit on error (no real rollback).
431        let mut results = Vec::new();
432        let mut rollback = false;
433        for op in ops {
434            let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
435            let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
436            match op_type {
437                "insert" => {
438                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
439                    match self.insert(entity, &data) {
440                        Ok(id) => results.push(serde_json::json!({"op":"insert","id":id})),
441                        Err(e) => {
442                            rollback = true;
443                            results.push(serde_json::json!({"op":"insert","error":e.message}));
444                            break;
445                        }
446                    }
447                }
448                "update" => {
449                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
450                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
451                    match self.update(entity, id, &data) {
452                        Ok(_) => results.push(serde_json::json!({"op":"update","id":id})),
453                        Err(e) => {
454                            rollback = true;
455                            results.push(serde_json::json!({"op":"update","error":e.message}));
456                            break;
457                        }
458                    }
459                }
460                "delete" => {
461                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
462                    match self.delete(entity, id) {
463                        Ok(_) => results.push(serde_json::json!({"op":"delete","id":id})),
464                        Err(e) => {
465                            rollback = true;
466                            results.push(serde_json::json!({"op":"delete","error":e.message}));
467                            break;
468                        }
469                    }
470                }
471                _ => {
472                    results.push(serde_json::json!({"op":op_type,"error":"unknown operation"}));
473                }
474            }
475        }
476        Ok((!rollback, results))
477    }
478}
479
480// ---------------------------------------------------------------------------
481// Tests
482// ---------------------------------------------------------------------------
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use std::sync::Mutex;
488
489    /// Minimal in-memory executor for unit tests.
490    struct MockExecutor {
491        rows: Mutex<Vec<Value>>,
492    }
493
494    impl D1Executor for MockExecutor {
495        fn execute(&self, _sql: &str, _params: &[Value]) -> Result<u64, String> {
496            Ok(1)
497        }
498        fn query(&self, _sql: &str, _params: &[Value]) -> Result<Vec<Value>, String> {
499            Ok(self.rows.lock().unwrap().clone())
500        }
501    }
502
503    fn empty_manifest() -> AppManifest {
504        AppManifest {
505            manifest_version: pylon_kernel::MANIFEST_VERSION,
506            name: "t".into(),
507            version: "0".into(),
508            entities: vec![ManifestEntity {
509                name: "Lot".into(),
510                fields: vec![pylon_kernel::ManifestField {
511                    name: "title".into(),
512                    field_type: "string".into(),
513                    optional: false,
514                    unique: false,
515                    crdt: None,
516                }],
517                indexes: vec![],
518                relations: vec![],
519                search: None,
520                crdt: true,
521            }],
522            routes: vec![],
523            queries: vec![],
524            actions: vec![],
525            policies: vec![],
526            auth: Default::default(),
527        }
528    }
529
530    #[test]
531    fn d1_insert_generates_id() {
532        let exec = MockExecutor {
533            rows: Mutex::new(vec![]),
534        };
535        let store = D1DataStore::new(exec, empty_manifest());
536        let id = store
537            .insert("Lot", &serde_json::json!({"title": "Test"}))
538            .unwrap();
539        assert!(!id.is_empty());
540    }
541
542    #[test]
543    fn d1_list_returns_rows() {
544        let exec = MockExecutor {
545            rows: Mutex::new(vec![serde_json::json!({"id":"a","title":"T"})]),
546        };
547        let store = D1DataStore::new(exec, empty_manifest());
548        let rows = store.list("Lot").unwrap();
549        assert_eq!(rows.len(), 1);
550    }
551
552    #[test]
553    fn d1_rejects_unknown_entity() {
554        let exec = MockExecutor {
555            rows: Mutex::new(vec![]),
556        };
557        let store = D1DataStore::new(exec, empty_manifest());
558        let err = store.list("Nope").unwrap_err();
559        assert_eq!(err.code, "ENTITY_NOT_FOUND");
560    }
561
562    #[test]
563    fn d1_rejects_unknown_column() {
564        let exec = MockExecutor {
565            rows: Mutex::new(vec![]),
566        };
567        let store = D1DataStore::new(exec, empty_manifest());
568        let err = store
569            .insert("Lot", &serde_json::json!({"evil": "x"}))
570            .unwrap_err();
571        assert_eq!(err.code, "INVALID_COLUMN");
572    }
573}