use regex::Regex;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::{Arc, RwLock};
use super::CollectionQuery;
use crate::Result;
static SAFE_IDENTIFIER_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]{0,127}$").unwrap());
fn validate_identifier(name: &str) -> Result<()> {
if SAFE_IDENTIFIER_RE.is_match(name) {
Ok(())
} else {
Err(crate::Error::Data(format!(
"Invalid identifier: {:?}",
name
)))
}
}
#[derive(Clone, Debug)]
struct TableSchema {
columns: Vec<String>,
is_json_blob: bool,
}
#[derive(Clone)]
pub struct D1Database {
client: Client,
account_id: String,
database_id: String,
api_token: String,
schema_cache: Arc<RwLock<HashMap<String, TableSchema>>>,
}
#[derive(Deserialize)]
struct D1Response {
success: bool,
result: Option<Vec<D1QueryResult>>,
errors: Option<Vec<D1Error>>,
}
#[derive(Deserialize)]
struct D1QueryResult {
results: Option<Vec<Value>>,
}
#[derive(Deserialize)]
struct D1Error {
message: String,
}
#[derive(Serialize)]
struct D1Query {
sql: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
params: Vec<Value>,
}
impl D1Database {
pub fn new(account_id: &str, database_id: &str, api_token: &str) -> Self {
Self {
client: Client::new(),
account_id: account_id.to_string(),
database_id: database_id.to_string(),
api_token: api_token.to_string(),
schema_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn init(&self) -> crate::Result<()> {
self.execute(
"CREATE TABLE IF NOT EXISTS _kv_store (key TEXT PRIMARY KEY, value TEXT NOT NULL)",
vec![],
)
.await?;
self.execute(
"CREATE TABLE IF NOT EXISTS _collections (name TEXT PRIMARY KEY)",
vec![],
)
.await?;
Ok(())
}
async fn query(&self, sql: &str, params: Vec<Value>) -> Result<Vec<Value>> {
let url = format!(
"https://api.cloudflare.com/client/v4/accounts/{}/d1/database/{}/query",
self.account_id, self.database_id
);
let body = D1Query {
sql: sql.to_string(),
params,
};
let resp = self
.client
.post(&url)
.bearer_auth(&self.api_token)
.json(&body)
.send()
.await
.map_err(|e| crate::Error::Data(format!("D1 request failed: {}", e)))?;
let status = resp.status();
let text = resp
.text()
.await
.map_err(|e| crate::Error::Data(format!("D1 response read error: {}", e)))?;
let d1_resp: D1Response = serde_json::from_str(&text).map_err(|_| {
crate::Error::Data(format!("D1 response parse error (status {})", status))
})?;
if !d1_resp.success {
let msg = d1_resp
.errors
.and_then(|e| e.first().map(|err| err.message.clone()))
.unwrap_or_else(|| "unknown D1 error".to_string());
return Err(crate::Error::Data(format!("D1 error: {}", msg)));
}
Ok(d1_resp
.result
.and_then(|r| r.into_iter().next())
.and_then(|r| r.results)
.unwrap_or_default())
}
async fn execute(&self, sql: &str, params: Vec<Value>) -> Result<()> {
self.query(sql, params).await?;
Ok(())
}
async fn get_schema(&self, table: &str) -> Result<TableSchema> {
{
let cache = self.schema_cache.read().unwrap_or_else(|e| e.into_inner());
if let Some(schema) = cache.get(table) {
return Ok(schema.clone());
}
}
let rows = self
.query(&format!("PRAGMA table_info(\"{}\")", table), vec![])
.await?;
let columns: Vec<String> = rows
.iter()
.filter_map(|row| {
row.get("name")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
.collect();
let is_json_blob = columns.len() == 2
&& columns.contains(&"id".to_string())
&& columns.contains(&"data".to_string());
let non_id_columns: Vec<String> = columns.into_iter().filter(|c| c != "id").collect();
let schema = TableSchema {
columns: non_id_columns,
is_json_blob,
};
{
let mut cache = self.schema_cache.write().unwrap_or_else(|e| e.into_inner());
cache.insert(table.to_string(), schema.clone());
}
Ok(schema)
}
fn row_to_item_with_schema(row: &Value, schema: &TableSchema) -> Value {
if schema.is_json_blob {
let id = row.get("id").cloned().unwrap_or(json!(0));
let data_str = row.get("data").and_then(|v| v.as_str()).unwrap_or("{}");
let mut item: Value = serde_json::from_str(data_str).unwrap_or(json!({}));
if let Value::Object(ref mut map) = item {
map.insert("id".to_string(), id);
}
item
} else {
row.clone()
}
}
pub async fn get_collection(&self, name: &str) -> Result<Vec<Value>> {
validate_identifier(name)?;
let schema = self.get_schema(name).await?;
let rows = self
.query(&format!("SELECT * FROM \"{}\"", name), vec![])
.await?;
Ok(rows
.into_iter()
.map(|row| Self::row_to_item_with_schema(&row, &schema))
.collect())
}
pub async fn query_collection(
&self,
name: &str,
query: &CollectionQuery,
) -> Result<Vec<Value>> {
validate_identifier(name)?;
let schema = self.get_schema(name).await?;
let mut sql = format!("SELECT * FROM \"{}\"", name);
let mut params: Vec<Value> = Vec::new();
let mut conditions: Vec<String> = Vec::new();
if let Some(ref filter_expr) = query.filter {
if let Some(filter_sql) = build_d1_filter(filter_expr, &mut params, &schema) {
conditions.push(filter_sql);
}
}
for forced in &query.forced_filters {
if let Some(filter_sql) = build_d1_filter(forced, &mut params, &schema) {
conditions.push(filter_sql);
}
}
if let Some(ref search) = query.search {
if !search.is_empty() {
let idx = params.len() + 1;
params.push(json!(format!("%{}%", search)));
if schema.is_json_blob {
conditions.push(format!("data LIKE ?{}", idx));
} else {
let col_searches: Vec<String> = schema
.columns
.iter()
.map(|col| format!("CAST(\"{}\" AS TEXT) LIKE ?{}", col, idx))
.collect();
if !col_searches.is_empty() {
conditions.push(format!("({})", col_searches.join(" OR ")));
}
}
}
}
if !conditions.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&conditions.join(" AND "));
}
if let Some(ref sort) = query.sort {
let (field, desc) = if let Some((f, d)) = sort.rsplit_once(':') {
(f, d.eq_ignore_ascii_case("desc"))
} else {
(sort.as_str(), false)
};
validate_identifier(field)?;
if schema.is_json_blob {
sql.push_str(&format!(
" ORDER BY json_extract(data, '$.{}') {}",
field,
if desc { "DESC" } else { "ASC" }
));
} else {
sql.push_str(&format!(
" ORDER BY \"{}\" {}",
field,
if desc { "DESC" } else { "ASC" }
));
}
}
if let Some(limit) = query.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = query.offset {
sql.push_str(&format!(" OFFSET {}", offset));
}
let rows = self.query(&sql, params).await?;
Ok(rows
.into_iter()
.map(|row| Self::row_to_item_with_schema(&row, &schema))
.collect())
}
pub async fn find_by(
&self,
collection: &str,
field: &str,
value: &Value,
) -> Result<Vec<Value>> {
validate_identifier(collection)?;
validate_identifier(field)?;
let schema = self.get_schema(collection).await?;
let sql = if schema.is_json_blob {
format!(
"SELECT * FROM \"{}\" WHERE json_extract(data, '$.{}') = ?1",
collection, field
)
} else {
format!("SELECT * FROM \"{}\" WHERE \"{}\" = ?1", collection, field)
};
let rows = self.query(&sql, vec![value.clone()]).await?;
Ok(rows
.into_iter()
.map(|row| Self::row_to_item_with_schema(&row, &schema))
.collect())
}
pub async fn find_one_by(
&self,
collection: &str,
field: &str,
value: &Value,
) -> Result<Option<Value>> {
validate_identifier(collection)?;
validate_identifier(field)?;
let schema = self.get_schema(collection).await?;
let sql = if schema.is_json_blob {
format!(
"SELECT * FROM \"{}\" WHERE json_extract(data, '$.{}') = ?1 LIMIT 1",
collection, field
)
} else {
format!(
"SELECT * FROM \"{}\" WHERE \"{}\" = ?1 LIMIT 1",
collection, field
)
};
let rows = self.query(&sql, vec![value.clone()]).await?;
Ok(rows
.into_iter()
.next()
.map(|row| Self::row_to_item_with_schema(&row, &schema)))
}
pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
validate_identifier(collection)?;
let schema = self.get_schema(collection).await?;
if schema.is_json_blob {
if let Value::Object(ref mut map) = item {
map.remove("id");
}
let data_str = serde_json::to_string(&item)?;
let rows = self
.query(
&format!(
"INSERT INTO \"{}\" (data) VALUES (?1) RETURNING id",
collection
),
vec![json!(data_str)],
)
.await?;
let id = rows
.first()
.and_then(|r| r.get("id"))
.cloned()
.unwrap_or(json!(0));
if let Value::Object(ref mut map) = item {
map.insert("id".to_string(), id);
}
Ok(item)
} else {
if let Value::Object(ref mut map) = item {
map.remove("id");
}
let mut col_names = Vec::new();
let mut placeholders = Vec::new();
let mut params = Vec::new();
if let Value::Object(ref map) = item {
let mut param_idx = 1;
for (key, val) in map.iter() {
if schema.columns.contains(key) {
col_names.push(format!("\"{}\"", key));
placeholders.push(format!("?{}", param_idx));
params.push(val.clone());
param_idx += 1;
}
}
}
let sql = format!(
"INSERT INTO \"{}\" ({}) VALUES ({}) RETURNING *",
collection,
col_names.join(", "),
placeholders.join(", ")
);
let rows = self.query(&sql, params).await?;
Ok(rows.into_iter().next().unwrap_or(item))
}
}
pub async fn update(
&self,
collection: &str,
id: &Value,
updates: Value,
) -> Result<Option<Value>> {
validate_identifier(collection)?;
let schema = self.get_schema(collection).await?;
if schema.is_json_blob {
let rows = self
.query(
&format!("SELECT * FROM \"{}\" WHERE id = ?1", collection),
vec![id.clone()],
)
.await?;
let row = match rows.into_iter().next() {
Some(r) => r,
None => return Ok(None),
};
let mut current = Self::row_to_item_with_schema(&row, &schema);
if let (Value::Object(map), Value::Object(updates_map)) = (&mut current, &updates) {
for (k, v) in updates_map {
map.insert(k.clone(), v.clone());
}
}
let mut data_for_storage = current.clone();
if let Value::Object(ref mut map) = data_for_storage {
map.remove("id");
}
let data_str = serde_json::to_string(&data_for_storage)?;
self.execute(
&format!("UPDATE \"{}\" SET data = ?1 WHERE id = ?2", collection),
vec![json!(data_str), id.clone()],
)
.await?;
Ok(Some(current))
} else {
let mut set_clauses = Vec::new();
let mut params = Vec::new();
let mut idx = 1;
if let Value::Object(ref map) = updates {
for (key, val) in map {
if key != "id" && schema.columns.contains(key) {
set_clauses.push(format!("\"{}\" = ?{}", key, idx));
params.push(val.clone());
idx += 1;
}
}
}
if set_clauses.is_empty() {
return Ok(None);
}
params.push(id.clone());
let sql = format!(
"UPDATE \"{}\" SET {} WHERE id = ?{} RETURNING *",
collection,
set_clauses.join(", "),
idx
);
let rows = self.query(&sql, params).await?;
Ok(rows.into_iter().next())
}
}
pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
validate_identifier(collection)?;
let rows = self
.query(
&format!("SELECT id FROM \"{}\" WHERE id = ?1", collection),
vec![id.clone()],
)
.await?;
if rows.is_empty() {
return Ok(false);
}
self.execute(
&format!("DELETE FROM \"{}\" WHERE id = ?1", collection),
vec![id.clone()],
)
.await?;
Ok(true)
}
pub async fn set(&self, key: &str, value: Value) -> Result<()> {
let value_str = serde_json::to_string(&value)?;
self.execute(
"INSERT OR REPLACE INTO _kv_store (key, value) VALUES (?1, ?2)",
vec![json!(key), json!(value_str)],
)
.await
}
pub async fn get(&self, key: &str) -> Result<Option<Value>> {
let rows = self
.query(
"SELECT value FROM _kv_store WHERE key = ?1",
vec![json!(key)],
)
.await?;
Ok(rows.into_iter().next().and_then(|row| {
row.get("value")
.and_then(|v| v.as_str())
.and_then(|s| serde_json::from_str(s).ok())
}))
}
pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
let existing = self.get(key).await?;
if existing.is_some() {
self.execute("DELETE FROM _kv_store WHERE key = ?1", vec![json!(key)])
.await?;
}
Ok(existing)
}
pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
where
F: FnOnce(Option<&Value>) -> Value,
{
let current = self.get(key).await?;
let new_value = f(current.as_ref());
self.set(key, new_value.clone()).await?;
Ok(new_value)
}
pub async fn as_context(&self) -> Result<HashMap<String, Value>> {
let mut context = HashMap::new();
let collections = self.query("SELECT name FROM _collections", vec![]).await?;
for row in collections {
if let Some(name) = row.get("name").and_then(|v| v.as_str()) {
if let Ok(items) = self.get_collection(name).await {
context.insert(name.to_string(), json!(items));
}
}
}
let kv_rows = self
.query("SELECT key, value FROM _kv_store", vec![])
.await?;
for row in kv_rows {
if let (Some(key), Some(value_str)) = (
row.get("key").and_then(|v| v.as_str()),
row.get("value").and_then(|v| v.as_str()),
) {
if let Ok(value) = serde_json::from_str::<Value>(value_str) {
context.insert(key.to_string(), value);
}
}
}
Ok(context)
}
pub fn invalidate_schema_cache(&self) {
let mut cache = self.schema_cache.write().unwrap_or_else(|e| e.into_inner());
cache.clear();
}
pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
validate_identifier(name)?;
self.execute(&format!("DELETE FROM \"{}\"", name), vec![])
.await?;
for item in items {
self.create(name, item).await?;
}
Ok(())
}
}
fn build_d1_filter(
filter_expr: &str,
params: &mut Vec<Value>,
schema: &TableSchema,
) -> Option<String> {
let or_groups: Vec<&str> = filter_expr.split(',').collect();
let mut or_parts: Vec<String> = Vec::new();
for group in or_groups {
let and_conditions: Vec<&str> = group.split('&').collect();
let mut and_parts: Vec<String> = Vec::new();
for cond in and_conditions {
let cond = cond.trim();
if let Some(sql) = build_d1_condition(cond, params, schema) {
and_parts.push(sql);
}
}
if !and_parts.is_empty() {
or_parts.push(if and_parts.len() == 1 {
and_parts.into_iter().next().unwrap()
} else {
format!("({})", and_parts.join(" AND "))
});
}
}
if or_parts.is_empty() {
None
} else if or_parts.len() == 1 {
Some(or_parts.into_iter().next().unwrap())
} else {
Some(format!("({})", or_parts.join(" OR ")))
}
}
fn build_d1_condition(cond: &str, params: &mut Vec<Value>, schema: &TableSchema) -> Option<String> {
let operators = [">=", "<=", ">", "<", "="];
for op in operators {
if let Some((field, val)) = cond.split_once(op) {
let field = field.trim();
let val = val.trim();
if !SAFE_IDENTIFIER_RE.is_match(field) {
return None;
}
let idx = params.len() + 1;
params.push(json!(val));
if schema.is_json_blob {
return Some(format!("json_extract(data, '$.{}') {} ?{}", field, op, idx));
} else {
return Some(format!("\"{}\" {} ?{}", field, op, idx));
}
}
}
None
}