1use pylon_http::{DataError, DataStore};
10use pylon_kernel::{AppManifest, ManifestEntity};
11use serde_json::Value;
12
13pub trait D1Executor: Send + Sync {
22 fn execute(&self, sql: &str, params: &[Value]) -> Result<u64, String>;
24
25 fn query(&self, sql: &str, params: &[Value]) -> Result<Vec<Value>, String>;
27
28 fn query_one(&self, sql: &str, params: &[Value]) -> Result<Option<Value>, String> {
30 let rows = self.query(sql, params)?;
31 Ok(rows.into_iter().next())
32 }
33}
34
35pub struct D1DataStore<E: D1Executor> {
40 executor: E,
41 manifest: AppManifest,
42}
43
44impl<E: D1Executor> D1DataStore<E> {
45 pub fn new(executor: E, manifest: AppManifest) -> Self {
46 Self { executor, manifest }
47 }
48
49 fn entity(&self, name: &str) -> Result<&ManifestEntity, DataError> {
50 self.manifest
51 .entities
52 .iter()
53 .find(|e| e.name == name)
54 .ok_or_else(|| DataError {
55 code: "ENTITY_NOT_FOUND".into(),
56 message: format!("Unknown entity: \"{name}\""),
57 })
58 }
59
60 fn validate_column(&self, entity: &ManifestEntity, col: &str) -> Result<(), DataError> {
61 if col == "id" || entity.fields.iter().any(|f| f.name == col) {
62 Ok(())
63 } else {
64 Err(DataError {
65 code: "INVALID_COLUMN".into(),
66 message: format!("Unknown column \"{col}\" on entity \"{}\"", entity.name),
67 })
68 }
69 }
70}
71
72use pylon_kernel::util::quote_ident;
73
74fn generate_id() -> String {
75 let now = std::time::SystemTime::now()
77 .duration_since(std::time::UNIX_EPOCH)
78 .unwrap_or_default()
79 .as_nanos();
80 format!("{:x}", now)
81}
82
83impl<E: D1Executor> DataStore for D1DataStore<E> {
88 fn manifest(&self) -> &AppManifest {
89 &self.manifest
90 }
91
92 fn insert(&self, entity: &str, data: &Value) -> Result<String, DataError> {
93 let ent = self.entity(entity)?;
94 let obj = data.as_object().ok_or_else(|| DataError {
95 code: "INVALID_DATA".into(),
96 message: "Insert data must be a JSON object".into(),
97 })?;
98
99 let id = generate_id();
100 let mut cols = vec![quote_ident("id")];
101 let mut placeholders = vec!["?1".to_string()];
102 let mut params: Vec<Value> = vec![Value::String(id.clone())];
103 let mut idx = 2;
104
105 for (k, v) in obj {
106 if k == "id" {
107 continue;
108 }
109 self.validate_column(ent, k)?;
110 cols.push(quote_ident(k));
111 placeholders.push(format!("?{idx}"));
112 params.push(v.clone());
113 idx += 1;
114 }
115
116 let sql = format!(
117 "INSERT INTO {} ({}) VALUES ({})",
118 quote_ident(entity),
119 cols.join(", "),
120 placeholders.join(", ")
121 );
122
123 self.executor
124 .execute(&sql, ¶ms)
125 .map_err(|e| DataError {
126 code: "INSERT_FAILED".into(),
127 message: e,
128 })?;
129
130 Ok(id)
131 }
132
133 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<Value>, DataError> {
134 let _ = self.entity(entity)?;
135 let sql = format!(
136 "SELECT * FROM {} WHERE \"id\" = ?1 LIMIT 1",
137 quote_ident(entity)
138 );
139 self.executor
140 .query_one(&sql, &[Value::String(id.to_string())])
141 .map_err(|e| DataError {
142 code: "QUERY_FAILED".into(),
143 message: e,
144 })
145 }
146
147 fn list(&self, entity: &str) -> Result<Vec<Value>, DataError> {
148 let _ = self.entity(entity)?;
149 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
150 self.executor.query(&sql, &[]).map_err(|e| DataError {
151 code: "QUERY_FAILED".into(),
152 message: e,
153 })
154 }
155
156 fn list_after(
157 &self,
158 entity: &str,
159 after: Option<&str>,
160 limit: usize,
161 ) -> Result<Vec<Value>, DataError> {
162 let _ = self.entity(entity)?;
163 let (sql, params): (String, Vec<Value>) = match after {
164 Some(cursor) => (
165 format!(
166 "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
167 quote_ident(entity)
168 ),
169 vec![
170 Value::String(cursor.to_string()),
171 Value::Number((limit as u64).into()),
172 ],
173 ),
174 None => (
175 format!(
176 "SELECT * FROM {} ORDER BY \"id\" LIMIT ?1",
177 quote_ident(entity)
178 ),
179 vec![Value::Number((limit as u64).into())],
180 ),
181 };
182
183 self.executor.query(&sql, ¶ms).map_err(|e| DataError {
184 code: "QUERY_FAILED".into(),
185 message: e,
186 })
187 }
188
189 fn update(&self, entity: &str, id: &str, data: &Value) -> Result<bool, DataError> {
190 let ent = self.entity(entity)?;
191 let obj = data.as_object().ok_or_else(|| DataError {
192 code: "INVALID_DATA".into(),
193 message: "Update data must be a JSON object".into(),
194 })?;
195
196 let mut sets = Vec::new();
197 let mut params: Vec<Value> = Vec::new();
198 let mut idx = 1;
199 for (k, v) in obj {
200 if k == "id" {
201 continue;
202 }
203 self.validate_column(ent, k)?;
204 sets.push(format!("{} = ?{idx}", quote_ident(k)));
205 params.push(v.clone());
206 idx += 1;
207 }
208 if sets.is_empty() {
209 return Ok(false);
210 }
211 params.push(Value::String(id.to_string()));
212 let sql = format!(
213 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
214 quote_ident(entity),
215 sets.join(", ")
216 );
217 let affected = self
218 .executor
219 .execute(&sql, ¶ms)
220 .map_err(|e| DataError {
221 code: "UPDATE_FAILED".into(),
222 message: e,
223 })?;
224 Ok(affected > 0)
225 }
226
227 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
228 let _ = self.entity(entity)?;
229 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
230 let affected = self
231 .executor
232 .execute(&sql, &[Value::String(id.to_string())])
233 .map_err(|e| DataError {
234 code: "DELETE_FAILED".into(),
235 message: e,
236 })?;
237 Ok(affected > 0)
238 }
239
240 fn lookup(&self, entity: &str, field: &str, value: &str) -> Result<Option<Value>, DataError> {
241 let ent = self.entity(entity)?;
242 self.validate_column(ent, field)?;
243 let sql = format!(
244 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
245 quote_ident(entity),
246 quote_ident(field)
247 );
248 self.executor
249 .query_one(&sql, &[Value::String(value.to_string())])
250 .map_err(|e| DataError {
251 code: "QUERY_FAILED".into(),
252 message: e,
253 })
254 }
255
256 fn link(
257 &self,
258 entity: &str,
259 id: &str,
260 relation: &str,
261 target_id: &str,
262 ) -> Result<bool, DataError> {
263 let ent = self.entity(entity)?;
264 let rel = ent
265 .relations
266 .iter()
267 .find(|r| r.name == relation)
268 .ok_or_else(|| DataError {
269 code: "RELATION_NOT_FOUND".into(),
270 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
271 })?;
272 let data = serde_json::json!({ rel.field.clone(): target_id });
273 self.update(entity, id, &data)
274 }
275
276 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
277 let ent = self.entity(entity)?;
278 let rel = ent
279 .relations
280 .iter()
281 .find(|r| r.name == relation)
282 .ok_or_else(|| DataError {
283 code: "RELATION_NOT_FOUND".into(),
284 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
285 })?;
286 let data = serde_json::json!({ rel.field.clone(): Value::Null });
287 self.update(entity, id, &data)
288 }
289
290 fn query_filtered(&self, entity: &str, filter: &Value) -> Result<Vec<Value>, DataError> {
291 let ent = self.entity(entity)?;
292 let empty = serde_json::Map::new();
293 let obj = filter.as_object().unwrap_or(&empty);
294
295 let mut where_clauses: Vec<String> = Vec::new();
296 let mut params: Vec<Value> = Vec::new();
297 let mut order_clause = String::new();
298 let mut limit_clause = String::new();
299 let mut idx = 1;
300
301 for (k, v) in obj {
302 match k.as_str() {
303 "$order" => {
304 if let Some(o) = v.as_object() {
305 let mut parts = Vec::new();
306 for (col, dir) in o {
307 self.validate_column(ent, col)?;
308 let d = match dir.as_str().unwrap_or("asc") {
309 "desc" | "DESC" => "DESC",
310 _ => "ASC",
311 };
312 parts.push(format!("{} {d}", quote_ident(col)));
313 }
314 if !parts.is_empty() {
315 order_clause = format!(" ORDER BY {}", parts.join(", "));
316 }
317 }
318 }
319 "$limit" => {
320 if let Some(n) = v.as_u64() {
321 limit_clause = format!(" LIMIT {n}");
322 }
323 }
324 _ => {
325 self.validate_column(ent, k)?;
326 let qk = quote_ident(k);
327 if let Some(op_obj) = v.as_object() {
328 for (op, op_val) in op_obj {
329 match op.as_str() {
330 "$not" => {
331 where_clauses.push(format!("{qk} != ?{idx}"));
332 params.push(op_val.clone());
333 idx += 1;
334 }
335 "$gt" => {
336 where_clauses.push(format!("{qk} > ?{idx}"));
337 params.push(op_val.clone());
338 idx += 1;
339 }
340 "$gte" => {
341 where_clauses.push(format!("{qk} >= ?{idx}"));
342 params.push(op_val.clone());
343 idx += 1;
344 }
345 "$lt" => {
346 where_clauses.push(format!("{qk} < ?{idx}"));
347 params.push(op_val.clone());
348 idx += 1;
349 }
350 "$lte" => {
351 where_clauses.push(format!("{qk} <= ?{idx}"));
352 params.push(op_val.clone());
353 idx += 1;
354 }
355 "$like" => {
356 where_clauses.push(format!("{qk} LIKE ?{idx}"));
357 let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
358 params.push(Value::String(pattern));
359 idx += 1;
360 }
361 "$in" => {
362 if let Some(arr) = op_val.as_array() {
363 let ph: Vec<String> = arr
364 .iter()
365 .map(|v| {
366 let p = format!("?{idx}");
367 params.push(v.clone());
368 idx += 1;
369 p
370 })
371 .collect();
372 if !ph.is_empty() {
373 where_clauses
374 .push(format!("{qk} IN ({})", ph.join(", ")));
375 }
376 }
377 }
378 _ => {}
379 }
380 }
381 } else {
382 where_clauses.push(format!("{qk} = ?{idx}"));
383 params.push(v.clone());
384 idx += 1;
385 }
386 }
387 }
388 }
389
390 let where_sql = if where_clauses.is_empty() {
391 String::new()
392 } else {
393 format!(" WHERE {}", where_clauses.join(" AND "))
394 };
395 if order_clause.is_empty() {
396 order_clause = " ORDER BY \"id\"".into();
397 }
398
399 let sql = format!(
400 "SELECT * FROM {}{}{}{}",
401 quote_ident(entity),
402 where_sql,
403 order_clause,
404 limit_clause
405 );
406
407 self.executor.query(&sql, ¶ms).map_err(|e| DataError {
408 code: "QUERY_FAILED".into(),
409 message: e,
410 })
411 }
412
413 fn query_graph(&self, query: &Value) -> Result<Value, DataError> {
414 let obj = query.as_object().ok_or_else(|| DataError {
415 code: "INVALID_QUERY".into(),
416 message: "Graph query must be a JSON object".into(),
417 })?;
418 let mut results = serde_json::Map::new();
419 for (entity_name, opts) in obj {
420 let filter = opts.get("where").cloned().unwrap_or(serde_json::json!({}));
421 let rows = self.query_filtered(entity_name, &filter)?;
422 results.insert(entity_name.clone(), Value::Array(rows));
423 }
424 Ok(Value::Object(results))
425 }
426
427 fn transact(&self, ops: &[Value]) -> Result<(bool, Vec<Value>), DataError> {
428 let mut results = Vec::new();
432 let mut rollback = false;
433 for op in ops {
434 let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
435 let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
436 match op_type {
437 "insert" => {
438 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
439 match self.insert(entity, &data) {
440 Ok(id) => results.push(serde_json::json!({"op":"insert","id":id})),
441 Err(e) => {
442 rollback = true;
443 results.push(serde_json::json!({"op":"insert","error":e.message}));
444 break;
445 }
446 }
447 }
448 "update" => {
449 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
450 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
451 match self.update(entity, id, &data) {
452 Ok(_) => results.push(serde_json::json!({"op":"update","id":id})),
453 Err(e) => {
454 rollback = true;
455 results.push(serde_json::json!({"op":"update","error":e.message}));
456 break;
457 }
458 }
459 }
460 "delete" => {
461 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
462 match self.delete(entity, id) {
463 Ok(_) => results.push(serde_json::json!({"op":"delete","id":id})),
464 Err(e) => {
465 rollback = true;
466 results.push(serde_json::json!({"op":"delete","error":e.message}));
467 break;
468 }
469 }
470 }
471 _ => {
472 results.push(serde_json::json!({"op":op_type,"error":"unknown operation"}));
473 }
474 }
475 }
476 Ok((!rollback, results))
477 }
478}
479
480#[cfg(test)]
485mod tests {
486 use super::*;
487 use std::sync::Mutex;
488
489 struct MockExecutor {
491 rows: Mutex<Vec<Value>>,
492 }
493
494 impl D1Executor for MockExecutor {
495 fn execute(&self, _sql: &str, _params: &[Value]) -> Result<u64, String> {
496 Ok(1)
497 }
498 fn query(&self, _sql: &str, _params: &[Value]) -> Result<Vec<Value>, String> {
499 Ok(self.rows.lock().unwrap().clone())
500 }
501 }
502
503 fn empty_manifest() -> AppManifest {
504 AppManifest {
505 manifest_version: pylon_kernel::MANIFEST_VERSION,
506 name: "t".into(),
507 version: "0".into(),
508 entities: vec![ManifestEntity {
509 name: "Lot".into(),
510 fields: vec![pylon_kernel::ManifestField {
511 name: "title".into(),
512 field_type: "string".into(),
513 optional: false,
514 unique: false,
515 crdt: None,
516 }],
517 indexes: vec![],
518 relations: vec![],
519 search: None,
520 crdt: true,
521 }],
522 routes: vec![],
523 queries: vec![],
524 actions: vec![],
525 policies: vec![],
526 auth: Default::default(),
527 }
528 }
529
530 #[test]
531 fn d1_insert_generates_id() {
532 let exec = MockExecutor {
533 rows: Mutex::new(vec![]),
534 };
535 let store = D1DataStore::new(exec, empty_manifest());
536 let id = store
537 .insert("Lot", &serde_json::json!({"title": "Test"}))
538 .unwrap();
539 assert!(!id.is_empty());
540 }
541
542 #[test]
543 fn d1_list_returns_rows() {
544 let exec = MockExecutor {
545 rows: Mutex::new(vec![serde_json::json!({"id":"a","title":"T"})]),
546 };
547 let store = D1DataStore::new(exec, empty_manifest());
548 let rows = store.list("Lot").unwrap();
549 assert_eq!(rows.len(), 1);
550 }
551
552 #[test]
553 fn d1_rejects_unknown_entity() {
554 let exec = MockExecutor {
555 rows: Mutex::new(vec![]),
556 };
557 let store = D1DataStore::new(exec, empty_manifest());
558 let err = store.list("Nope").unwrap_err();
559 assert_eq!(err.code, "ENTITY_NOT_FOUND");
560 }
561
562 #[test]
563 fn d1_rejects_unknown_column() {
564 let exec = MockExecutor {
565 rows: Mutex::new(vec![]),
566 };
567 let store = D1DataStore::new(exec, empty_manifest());
568 let err = store
569 .insert("Lot", &serde_json::json!({"evil": "x"}))
570 .unwrap_err();
571 assert_eq!(err.code, "INVALID_COLUMN");
572 }
573}