1use regex::Regex;
14use reqwest::Client;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17use std::collections::HashMap;
18use std::sync::LazyLock;
19use std::sync::{Arc, RwLock};
20
21use super::CollectionQuery;
22use crate::Result;
23
24static SAFE_IDENTIFIER_RE: LazyLock<Regex> =
27 LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]{0,127}$").unwrap());
28
29fn validate_identifier(name: &str) -> Result<()> {
30 if SAFE_IDENTIFIER_RE.is_match(name) {
31 Ok(())
32 } else {
33 Err(crate::Error::Data(format!(
34 "Invalid identifier: {:?}",
35 name
36 )))
37 }
38}
39
40#[derive(Clone, Debug)]
42struct TableSchema {
43 columns: Vec<String>,
45 is_json_blob: bool,
47}
48
49#[derive(Clone)]
51pub struct D1Database {
52 client: Client,
53 account_id: String,
54 database_id: String,
55 api_token: String,
56 schema_cache: Arc<RwLock<HashMap<String, TableSchema>>>,
58}
59
60#[derive(Deserialize)]
62struct D1Response {
63 success: bool,
64 result: Option<Vec<D1QueryResult>>,
65 errors: Option<Vec<D1Error>>,
66}
67
68#[derive(Deserialize)]
69struct D1QueryResult {
70 results: Option<Vec<Value>>,
71}
72
73#[derive(Deserialize)]
74struct D1Error {
75 message: String,
76}
77
78#[derive(Serialize)]
80struct D1Query {
81 sql: String,
82 #[serde(skip_serializing_if = "Vec::is_empty")]
83 params: Vec<Value>,
84}
85
86impl D1Database {
87 pub fn new(account_id: &str, database_id: &str, api_token: &str) -> Self {
88 Self {
89 client: Client::new(),
90 account_id: account_id.to_string(),
91 database_id: database_id.to_string(),
92 api_token: api_token.to_string(),
93 schema_cache: Arc::new(RwLock::new(HashMap::new())),
94 }
95 }
96
97 pub async fn init(&self) -> crate::Result<()> {
100 self.execute(
101 "CREATE TABLE IF NOT EXISTS _kv_store (key TEXT PRIMARY KEY, value TEXT NOT NULL)",
102 vec![],
103 )
104 .await?;
105 self.execute(
106 "CREATE TABLE IF NOT EXISTS _collections (name TEXT PRIMARY KEY)",
107 vec![],
108 )
109 .await?;
110 Ok(())
111 }
112
113 async fn query(&self, sql: &str, params: Vec<Value>) -> Result<Vec<Value>> {
115 let url = format!(
116 "https://api.cloudflare.com/client/v4/accounts/{}/d1/database/{}/query",
117 self.account_id, self.database_id
118 );
119
120 let body = D1Query {
121 sql: sql.to_string(),
122 params,
123 };
124
125 let resp = self
126 .client
127 .post(&url)
128 .bearer_auth(&self.api_token)
129 .json(&body)
130 .send()
131 .await
132 .map_err(|e| crate::Error::Data(format!("D1 request failed: {}", e)))?;
133
134 let status = resp.status();
135 let text = resp
136 .text()
137 .await
138 .map_err(|e| crate::Error::Data(format!("D1 response read error: {}", e)))?;
139
140 let d1_resp: D1Response = serde_json::from_str(&text).map_err(|_| {
141 crate::Error::Data(format!("D1 response parse error (status {})", status))
142 })?;
143
144 if !d1_resp.success {
145 let msg = d1_resp
146 .errors
147 .and_then(|e| e.first().map(|err| err.message.clone()))
148 .unwrap_or_else(|| "unknown D1 error".to_string());
149 return Err(crate::Error::Data(format!("D1 error: {}", msg)));
150 }
151
152 Ok(d1_resp
153 .result
154 .and_then(|r| r.into_iter().next())
155 .and_then(|r| r.results)
156 .unwrap_or_default())
157 }
158
159 async fn execute(&self, sql: &str, params: Vec<Value>) -> Result<()> {
161 self.query(sql, params).await?;
162 Ok(())
163 }
164
165 async fn get_schema(&self, table: &str) -> Result<TableSchema> {
171 {
173 let cache = self.schema_cache.read().unwrap_or_else(|e| e.into_inner());
174 if let Some(schema) = cache.get(table) {
175 return Ok(schema.clone());
176 }
177 }
178
179 let rows = self
181 .query(&format!("PRAGMA table_info(\"{}\")", table), vec![])
182 .await?;
183
184 let columns: Vec<String> = rows
185 .iter()
186 .filter_map(|row| {
187 row.get("name")
188 .and_then(|v| v.as_str())
189 .map(|s| s.to_string())
190 })
191 .collect();
192
193 let is_json_blob = columns.len() == 2
195 && columns.contains(&"id".to_string())
196 && columns.contains(&"data".to_string());
197
198 let non_id_columns: Vec<String> = columns.into_iter().filter(|c| c != "id").collect();
199
200 let schema = TableSchema {
201 columns: non_id_columns,
202 is_json_blob,
203 };
204
205 {
207 let mut cache = self.schema_cache.write().unwrap_or_else(|e| e.into_inner());
208 cache.insert(table.to_string(), schema.clone());
209 }
210
211 Ok(schema)
212 }
213
214 fn row_to_item_with_schema(row: &Value, schema: &TableSchema) -> Value {
222 if schema.is_json_blob {
223 let id = row.get("id").cloned().unwrap_or(json!(0));
225 let data_str = row.get("data").and_then(|v| v.as_str()).unwrap_or("{}");
226 let mut item: Value = serde_json::from_str(data_str).unwrap_or(json!({}));
227 if let Value::Object(ref mut map) = item {
228 map.insert("id".to_string(), id);
229 }
230 item
231 } else {
232 row.clone()
234 }
235 }
236
237 pub async fn get_collection(&self, name: &str) -> Result<Vec<Value>> {
242 validate_identifier(name)?;
243 let schema = self.get_schema(name).await?;
244 let rows = self
245 .query(&format!("SELECT * FROM \"{}\"", name), vec![])
246 .await?;
247
248 Ok(rows
249 .into_iter()
250 .map(|row| Self::row_to_item_with_schema(&row, &schema))
251 .collect())
252 }
253
254 pub async fn query_collection(
255 &self,
256 name: &str,
257 query: &CollectionQuery,
258 ) -> Result<Vec<Value>> {
259 validate_identifier(name)?;
260 let schema = self.get_schema(name).await?;
261 let mut sql = format!("SELECT * FROM \"{}\"", name);
262 let mut params: Vec<Value> = Vec::new();
263 let mut conditions: Vec<String> = Vec::new();
264
265 if let Some(ref filter_expr) = query.filter {
267 if let Some(filter_sql) = build_d1_filter(filter_expr, &mut params, &schema) {
268 conditions.push(filter_sql);
269 }
270 }
271
272 for forced in &query.forced_filters {
277 if let Some(filter_sql) = build_d1_filter(forced, &mut params, &schema) {
278 conditions.push(filter_sql);
279 }
280 }
281
282 if let Some(ref search) = query.search {
284 if !search.is_empty() {
285 let idx = params.len() + 1;
286 params.push(json!(format!("%{}%", search)));
287 if schema.is_json_blob {
288 conditions.push(format!("data LIKE ?{}", idx));
289 } else {
290 let col_searches: Vec<String> = schema
292 .columns
293 .iter()
294 .map(|col| format!("CAST(\"{}\" AS TEXT) LIKE ?{}", col, idx))
295 .collect();
296 if !col_searches.is_empty() {
297 conditions.push(format!("({})", col_searches.join(" OR ")));
298 }
299 }
300 }
301 }
302
303 if !conditions.is_empty() {
304 sql.push_str(" WHERE ");
305 sql.push_str(&conditions.join(" AND "));
306 }
307
308 if let Some(ref sort) = query.sort {
310 let (field, desc) = if let Some((f, d)) = sort.rsplit_once(':') {
311 (f, d.eq_ignore_ascii_case("desc"))
312 } else {
313 (sort.as_str(), false)
314 };
315 validate_identifier(field)?;
316 if schema.is_json_blob {
317 sql.push_str(&format!(
318 " ORDER BY json_extract(data, '$.{}') {}",
319 field,
320 if desc { "DESC" } else { "ASC" }
321 ));
322 } else {
323 sql.push_str(&format!(
324 " ORDER BY \"{}\" {}",
325 field,
326 if desc { "DESC" } else { "ASC" }
327 ));
328 }
329 }
330
331 if let Some(limit) = query.limit {
332 sql.push_str(&format!(" LIMIT {}", limit));
333 }
334 if let Some(offset) = query.offset {
335 sql.push_str(&format!(" OFFSET {}", offset));
336 }
337
338 let rows = self.query(&sql, params).await?;
339 Ok(rows
340 .into_iter()
341 .map(|row| Self::row_to_item_with_schema(&row, &schema))
342 .collect())
343 }
344
345 pub async fn find_by(
346 &self,
347 collection: &str,
348 field: &str,
349 value: &Value,
350 ) -> Result<Vec<Value>> {
351 validate_identifier(collection)?;
352 validate_identifier(field)?;
353 let schema = self.get_schema(collection).await?;
354
355 let sql = if schema.is_json_blob {
356 format!(
357 "SELECT * FROM \"{}\" WHERE json_extract(data, '$.{}') = ?1",
358 collection, field
359 )
360 } else {
361 format!("SELECT * FROM \"{}\" WHERE \"{}\" = ?1", collection, field)
362 };
363
364 let rows = self.query(&sql, vec![value.clone()]).await?;
365 Ok(rows
366 .into_iter()
367 .map(|row| Self::row_to_item_with_schema(&row, &schema))
368 .collect())
369 }
370
371 pub async fn find_one_by(
372 &self,
373 collection: &str,
374 field: &str,
375 value: &Value,
376 ) -> Result<Option<Value>> {
377 validate_identifier(collection)?;
378 validate_identifier(field)?;
379 let schema = self.get_schema(collection).await?;
380
381 let sql = if schema.is_json_blob {
382 format!(
383 "SELECT * FROM \"{}\" WHERE json_extract(data, '$.{}') = ?1 LIMIT 1",
384 collection, field
385 )
386 } else {
387 format!(
388 "SELECT * FROM \"{}\" WHERE \"{}\" = ?1 LIMIT 1",
389 collection, field
390 )
391 };
392
393 let rows = self.query(&sql, vec![value.clone()]).await?;
394 Ok(rows
395 .into_iter()
396 .next()
397 .map(|row| Self::row_to_item_with_schema(&row, &schema)))
398 }
399
400 pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
401 validate_identifier(collection)?;
402 let schema = self.get_schema(collection).await?;
403
404 if schema.is_json_blob {
405 if let Value::Object(ref mut map) = item {
407 map.remove("id");
408 }
409 let data_str = serde_json::to_string(&item)?;
410
411 let rows = self
412 .query(
413 &format!(
414 "INSERT INTO \"{}\" (data) VALUES (?1) RETURNING id",
415 collection
416 ),
417 vec![json!(data_str)],
418 )
419 .await?;
420
421 let id = rows
422 .first()
423 .and_then(|r| r.get("id"))
424 .cloned()
425 .unwrap_or(json!(0));
426
427 if let Value::Object(ref mut map) = item {
428 map.insert("id".to_string(), id);
429 }
430 Ok(item)
431 } else {
432 if let Value::Object(ref mut map) = item {
434 map.remove("id");
435 }
436
437 let mut col_names = Vec::new();
438 let mut placeholders = Vec::new();
439 let mut params = Vec::new();
440
441 if let Value::Object(ref map) = item {
442 let mut param_idx = 1;
443 for (key, val) in map.iter() {
444 if schema.columns.contains(key) {
445 col_names.push(format!("\"{}\"", key));
446 placeholders.push(format!("?{}", param_idx));
447 params.push(val.clone());
448 param_idx += 1;
449 }
450 }
451 }
452
453 let sql = format!(
454 "INSERT INTO \"{}\" ({}) VALUES ({}) RETURNING *",
455 collection,
456 col_names.join(", "),
457 placeholders.join(", ")
458 );
459
460 let rows = self.query(&sql, params).await?;
461 Ok(rows.into_iter().next().unwrap_or(item))
462 }
463 }
464
465 pub async fn update(
466 &self,
467 collection: &str,
468 id: &Value,
469 updates: Value,
470 ) -> Result<Option<Value>> {
471 validate_identifier(collection)?;
472 let schema = self.get_schema(collection).await?;
473
474 if schema.is_json_blob {
475 let rows = self
477 .query(
478 &format!("SELECT * FROM \"{}\" WHERE id = ?1", collection),
479 vec![id.clone()],
480 )
481 .await?;
482
483 let row = match rows.into_iter().next() {
484 Some(r) => r,
485 None => return Ok(None),
486 };
487
488 let mut current = Self::row_to_item_with_schema(&row, &schema);
489
490 if let (Value::Object(map), Value::Object(updates_map)) = (&mut current, &updates) {
491 for (k, v) in updates_map {
492 map.insert(k.clone(), v.clone());
493 }
494 }
495
496 let mut data_for_storage = current.clone();
497 if let Value::Object(ref mut map) = data_for_storage {
498 map.remove("id");
499 }
500 let data_str = serde_json::to_string(&data_for_storage)?;
501
502 self.execute(
503 &format!("UPDATE \"{}\" SET data = ?1 WHERE id = ?2", collection),
504 vec![json!(data_str), id.clone()],
505 )
506 .await?;
507
508 Ok(Some(current))
509 } else {
510 let mut set_clauses = Vec::new();
512 let mut params = Vec::new();
513 let mut idx = 1;
514
515 if let Value::Object(ref map) = updates {
516 for (key, val) in map {
517 if key != "id" && schema.columns.contains(key) {
518 set_clauses.push(format!("\"{}\" = ?{}", key, idx));
519 params.push(val.clone());
520 idx += 1;
521 }
522 }
523 }
524
525 if set_clauses.is_empty() {
526 return Ok(None);
527 }
528
529 params.push(id.clone());
530 let sql = format!(
531 "UPDATE \"{}\" SET {} WHERE id = ?{} RETURNING *",
532 collection,
533 set_clauses.join(", "),
534 idx
535 );
536
537 let rows = self.query(&sql, params).await?;
538 Ok(rows.into_iter().next())
539 }
540 }
541
542 pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
543 validate_identifier(collection)?;
544 let rows = self
545 .query(
546 &format!("SELECT id FROM \"{}\" WHERE id = ?1", collection),
547 vec![id.clone()],
548 )
549 .await?;
550
551 if rows.is_empty() {
552 return Ok(false);
553 }
554
555 self.execute(
556 &format!("DELETE FROM \"{}\" WHERE id = ?1", collection),
557 vec![id.clone()],
558 )
559 .await?;
560
561 Ok(true)
562 }
563
564 pub async fn set(&self, key: &str, value: Value) -> Result<()> {
569 let value_str = serde_json::to_string(&value)?;
570 self.execute(
571 "INSERT OR REPLACE INTO _kv_store (key, value) VALUES (?1, ?2)",
572 vec![json!(key), json!(value_str)],
573 )
574 .await
575 }
576
577 pub async fn get(&self, key: &str) -> Result<Option<Value>> {
578 let rows = self
579 .query(
580 "SELECT value FROM _kv_store WHERE key = ?1",
581 vec![json!(key)],
582 )
583 .await?;
584
585 Ok(rows.into_iter().next().and_then(|row| {
586 row.get("value")
587 .and_then(|v| v.as_str())
588 .and_then(|s| serde_json::from_str(s).ok())
589 }))
590 }
591
592 pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
593 let existing = self.get(key).await?;
594 if existing.is_some() {
595 self.execute("DELETE FROM _kv_store WHERE key = ?1", vec![json!(key)])
596 .await?;
597 }
598 Ok(existing)
599 }
600
601 pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
602 where
603 F: FnOnce(Option<&Value>) -> Value,
604 {
605 let current = self.get(key).await?;
606 let new_value = f(current.as_ref());
607 self.set(key, new_value.clone()).await?;
608 Ok(new_value)
609 }
610
611 pub async fn as_context(&self) -> Result<HashMap<String, Value>> {
616 let mut context = HashMap::new();
617
618 let collections = self.query("SELECT name FROM _collections", vec![]).await?;
619
620 for row in collections {
621 if let Some(name) = row.get("name").and_then(|v| v.as_str()) {
622 if let Ok(items) = self.get_collection(name).await {
623 context.insert(name.to_string(), json!(items));
624 }
625 }
626 }
627
628 let kv_rows = self
629 .query("SELECT key, value FROM _kv_store", vec![])
630 .await?;
631 for row in kv_rows {
632 if let (Some(key), Some(value_str)) = (
633 row.get("key").and_then(|v| v.as_str()),
634 row.get("value").and_then(|v| v.as_str()),
635 ) {
636 if let Ok(value) = serde_json::from_str::<Value>(value_str) {
637 context.insert(key.to_string(), value);
638 }
639 }
640 }
641
642 Ok(context)
643 }
644
645 pub fn invalidate_schema_cache(&self) {
648 let mut cache = self.schema_cache.write().unwrap_or_else(|e| e.into_inner());
649 cache.clear();
650 }
651
652 pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
653 validate_identifier(name)?;
654 self.execute(&format!("DELETE FROM \"{}\"", name), vec![])
655 .await?;
656 for item in items {
657 self.create(name, item).await?;
658 }
659 Ok(())
660 }
661}
662
663fn build_d1_filter(
665 filter_expr: &str,
666 params: &mut Vec<Value>,
667 schema: &TableSchema,
668) -> Option<String> {
669 let or_groups: Vec<&str> = filter_expr.split(',').collect();
670 let mut or_parts: Vec<String> = Vec::new();
671
672 for group in or_groups {
673 let and_conditions: Vec<&str> = group.split('&').collect();
674 let mut and_parts: Vec<String> = Vec::new();
675
676 for cond in and_conditions {
677 let cond = cond.trim();
678 if let Some(sql) = build_d1_condition(cond, params, schema) {
679 and_parts.push(sql);
680 }
681 }
682
683 if !and_parts.is_empty() {
684 or_parts.push(if and_parts.len() == 1 {
685 and_parts.into_iter().next().unwrap()
686 } else {
687 format!("({})", and_parts.join(" AND "))
688 });
689 }
690 }
691
692 if or_parts.is_empty() {
693 None
694 } else if or_parts.len() == 1 {
695 Some(or_parts.into_iter().next().unwrap())
696 } else {
697 Some(format!("({})", or_parts.join(" OR ")))
698 }
699}
700
701fn build_d1_condition(cond: &str, params: &mut Vec<Value>, schema: &TableSchema) -> Option<String> {
702 let operators = [">=", "<=", ">", "<", "="];
703 for op in operators {
704 if let Some((field, val)) = cond.split_once(op) {
705 let field = field.trim();
706 let val = val.trim();
707 if !SAFE_IDENTIFIER_RE.is_match(field) {
708 return None;
709 }
710 let idx = params.len() + 1;
711 params.push(json!(val));
712 if schema.is_json_blob {
713 return Some(format!("json_extract(data, '$.{}') {} ?{}", field, op, idx));
714 } else {
715 return Some(format!("\"{}\" {} ?{}", field, op, idx));
716 }
717 }
718 }
719 None
720}