use async_trait::async_trait;
use recoco::base::schema::{BasicValueType, FieldSchema, ValueType};
use recoco::base::value::{BasicValue, FieldValues, KeyValue, Value};
use recoco::ops::factory_bases::TargetFactoryBase;
use recoco::ops::interface::{
ExportTargetDeleteEntry, ExportTargetMutationWithContext, ExportTargetUpsertEntry,
FlowInstanceContext, SetupStateCompatibility,
};
use recoco::ops::sdk::{
TypedExportDataCollectionBuildOutput, TypedExportDataCollectionSpec,
TypedResourceSetupChangeItem,
};
use recoco::setup::{ChangeDescription, CombinedState, ResourceSetupChange, SetupChangeType};
use recoco::utils::prelude::Error as RecocoError;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
#[cfg(feature = "caching")]
use crate::cache::{CacheConfig, QueryCache};
#[derive(Debug, Clone)]
pub struct D1TargetFactory;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct D1Spec {
pub account_id: String,
pub database_id: String,
pub api_token: String,
pub table_name: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct D1TableId {
pub database_id: String,
pub table_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct D1SetupState {
pub table_id: D1TableId,
pub key_columns: Vec<ColumnSchema>,
pub value_columns: Vec<ColumnSchema>,
pub indexes: Vec<IndexSchema>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ColumnSchema {
pub name: String,
pub sql_type: String,
pub nullable: bool,
pub primary_key: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IndexSchema {
pub name: String,
pub columns: Vec<String>,
pub unique: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct D1SetupChange {
pub table_id: D1TableId,
pub create_table_sql: Option<String>,
pub create_indexes_sql: Vec<String>,
pub alter_table_sql: Vec<String>,
}
impl ResourceSetupChange for D1SetupChange {
fn describe_changes(&self) -> Vec<ChangeDescription> {
let mut changes = vec![];
if let Some(sql) = &self.create_table_sql {
changes.push(ChangeDescription::Action(format!("CREATE TABLE: {}", sql)));
}
for sql in &self.alter_table_sql {
changes.push(ChangeDescription::Action(format!("ALTER TABLE: {}", sql)));
}
for sql in &self.create_indexes_sql {
changes.push(ChangeDescription::Action(format!("CREATE INDEX: {}", sql)));
}
changes
}
fn change_type(&self) -> SetupChangeType {
if self.create_table_sql.is_some() {
SetupChangeType::Create
} else if !self.alter_table_sql.is_empty() || !self.create_indexes_sql.is_empty() {
SetupChangeType::Update
} else {
SetupChangeType::Invalid
}
}
}
pub struct D1ExportContext {
pub database_id: String,
pub table_name: String,
pub account_id: String,
pub api_token: String,
pub http_client: Arc<reqwest::Client>,
pub key_fields_schema: Vec<FieldSchema>,
pub value_fields_schema: Vec<FieldSchema>,
pub metrics: crate::monitoring::performance::PerformanceMetrics,
#[cfg(feature = "caching")]
pub query_cache: QueryCache<String, serde_json::Value>,
}
impl D1ExportContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
database_id: String,
table_name: String,
account_id: String,
api_token: String,
http_client: Arc<reqwest::Client>,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
metrics: crate::monitoring::performance::PerformanceMetrics,
) -> Result<Self, RecocoError> {
#[cfg(feature = "caching")]
let query_cache = QueryCache::new(CacheConfig {
max_capacity: 10_000, ttl_seconds: 300, });
Ok(Self {
database_id,
table_name,
account_id,
api_token,
http_client,
key_fields_schema,
value_fields_schema,
metrics,
#[cfg(feature = "caching")]
query_cache,
})
}
pub fn new_with_default_client(
database_id: String,
table_name: String,
account_id: String,
api_token: String,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
metrics: crate::monitoring::performance::PerformanceMetrics,
) -> Result<Self, RecocoError> {
use std::time::Duration;
let http_client = Arc::new(
reqwest::Client::builder()
.pool_max_idle_per_host(10)
.pool_idle_timeout(Some(Duration::from_secs(90)))
.tcp_keepalive(Some(Duration::from_secs(60)))
.http2_keep_alive_interval(Some(Duration::from_secs(30)))
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| {
RecocoError::internal_msg(format!("Failed to create HTTP client: {}", e))
})?,
);
Self::new(
database_id,
table_name,
account_id,
api_token,
http_client,
key_fields_schema,
value_fields_schema,
metrics,
)
}
pub fn api_url(&self) -> String {
format!(
"https://api.cloudflare.com/client/v4/accounts/{}/d1/database/{}/query",
self.account_id, self.database_id
)
}
async fn execute_sql(
&self,
sql: &str,
params: Vec<serde_json::Value>,
) -> Result<(), RecocoError> {
use std::time::Instant;
#[cfg(feature = "caching")]
let cache_key = format!("{}{:?}", sql, params);
#[cfg(feature = "caching")]
{
if let Some(_cached_result) = self.query_cache.get(&cache_key).await {
self.metrics.record_cache_hit();
return Ok(());
}
self.metrics.record_cache_miss();
}
let start = Instant::now();
let request_body = serde_json::json!({
"sql": sql,
"params": params
});
let response = self
.http_client
.post(self.api_url())
.header("Authorization", format!("Bearer {}", self.api_token))
.header("Content-Type", "application/json")
.json(&request_body)
.send()
.await
.map_err(|e| {
self.metrics.record_query(start.elapsed(), false);
RecocoError::internal_msg(format!("D1 API request failed: {}", e))
})?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
self.metrics.record_query(start.elapsed(), false);
return Err(RecocoError::internal_msg(format!(
"D1 API error ({}): {}",
status, error_text
)));
}
let result: serde_json::Value = response.json().await.map_err(|e| {
self.metrics.record_query(start.elapsed(), false);
RecocoError::internal_msg(format!("Failed to parse D1 response: {}", e))
})?;
if !result["success"].as_bool().unwrap_or(false) {
let errors = result["errors"].to_string();
self.metrics.record_query(start.elapsed(), false);
return Err(RecocoError::internal_msg(format!(
"D1 execution failed: {}",
errors
)));
}
self.metrics.record_query(start.elapsed(), true);
#[cfg(feature = "caching")]
{
self.query_cache.insert(cache_key, result.clone()).await;
}
Ok(())
}
async fn execute_batch(
&self,
statements: Vec<(String, Vec<serde_json::Value>)>,
) -> Result<(), RecocoError> {
for (sql, params) in statements {
self.execute_sql(&sql, params).await?;
}
Ok(())
}
pub fn build_upsert_stmt(
&self,
key: &KeyValue,
values: &FieldValues,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut columns = vec![];
let mut placeholders = vec![];
let mut params = vec![];
let mut update_clauses = vec![];
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
columns.push(self.key_fields_schema[idx].name.clone());
placeholders.push("?".to_string());
params.push(key_part_to_json(key_part)?);
}
}
for (idx, value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
columns.push(value_field.name.clone());
placeholders.push("?".to_string());
params.push(value_to_json(value)?);
update_clauses.push(format!(
"{} = excluded.{}",
value_field.name, value_field.name
));
}
}
let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
self.table_name,
columns.join(", "),
placeholders.join(", "),
update_clauses.join(", ")
);
Ok((sql, params))
}
pub fn build_delete_stmt(
&self,
key: &KeyValue,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut where_clauses = vec![];
let mut params = vec![];
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
params.push(key_part_to_json(key_part)?);
}
}
let sql = format!(
"DELETE FROM {} WHERE {}",
self.table_name,
where_clauses.join(" AND ")
);
Ok((sql, params))
}
pub async fn upsert(&self, upserts: &[ExportTargetUpsertEntry]) -> Result<(), RecocoError> {
let statements = upserts
.iter()
.map(|entry| self.build_upsert_stmt(&entry.key, &entry.value))
.collect::<Result<Vec<_>, _>>()?;
let result = self.execute_batch(statements).await;
#[cfg(feature = "caching")]
if result.is_ok() {
self.query_cache.clear().await;
}
result
}
pub async fn delete(&self, deletes: &[ExportTargetDeleteEntry]) -> Result<(), RecocoError> {
let statements = deletes
.iter()
.map(|entry| self.build_delete_stmt(&entry.key))
.collect::<Result<Vec<_>, _>>()?;
let result = self.execute_batch(statements).await;
#[cfg(feature = "caching")]
if result.is_ok() {
self.query_cache.clear().await;
}
result
}
#[cfg(feature = "caching")]
pub async fn cache_stats(&self) -> crate::cache::CacheStats {
self.query_cache.stats().await
}
#[cfg(feature = "caching")]
pub async fn clear_cache(&self) {
self.query_cache.clear().await;
}
}
pub fn key_part_to_json(
key_part: &recoco::base::value::KeyPart,
) -> Result<serde_json::Value, RecocoError> {
use recoco::base::value::KeyPart;
Ok(match key_part {
KeyPart::Bytes(b) => {
use base64::Engine;
serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(b))
}
KeyPart::Str(s) => serde_json::Value::String(s.to_string()),
KeyPart::Bool(b) => serde_json::Value::Bool(*b),
KeyPart::Int64(i) => serde_json::Value::Number((*i).into()),
KeyPart::Range(range) => serde_json::json!([range.start, range.end]),
KeyPart::Uuid(uuid) => serde_json::Value::String(uuid.to_string()),
KeyPart::Date(date) => serde_json::Value::String(date.to_string()),
KeyPart::Struct(parts) => {
let json_parts: Result<Vec<_>, _> = parts.iter().map(key_part_to_json).collect();
serde_json::Value::Array(json_parts?)
}
})
}
pub fn value_to_json(value: &Value) -> Result<serde_json::Value, RecocoError> {
Ok(match value {
Value::Null => serde_json::Value::Null,
Value::Basic(basic) => basic_value_to_json(basic)?,
Value::Struct(field_values) => {
let fields: Result<Vec<_>, _> = field_values.fields.iter().map(value_to_json).collect();
serde_json::Value::Array(fields?)
}
Value::UTable(items) | Value::LTable(items) => {
let json_items: Result<Vec<_>, _> = items
.iter()
.map(|scope_val| {
let fields: Result<Vec<_>, _> =
scope_val.0.fields.iter().map(value_to_json).collect();
fields.map(serde_json::Value::Array)
})
.collect();
serde_json::Value::Array(json_items?)
}
Value::KTable(map) => {
let mut json_map = serde_json::Map::new();
for (key, scope_val) in map {
let key_str = format!("{:?}", key); let fields: Result<Vec<_>, _> =
scope_val.0.fields.iter().map(value_to_json).collect();
json_map.insert(key_str, serde_json::Value::Array(fields?));
}
serde_json::Value::Object(json_map)
}
})
}
pub fn basic_value_to_json(value: &BasicValue) -> Result<serde_json::Value, RecocoError> {
Ok(match value {
BasicValue::Bool(b) => serde_json::Value::Bool(*b),
BasicValue::Int64(i) => serde_json::Value::Number((*i).into()),
BasicValue::Float32(f) => serde_json::Number::from_f64(*f as f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
BasicValue::Float64(f) => serde_json::Number::from_f64(*f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
BasicValue::Str(s) => serde_json::Value::String(s.to_string()),
BasicValue::Bytes(b) => {
use base64::Engine;
serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(b))
}
BasicValue::Json(j) => (**j).clone(),
BasicValue::Vector(vec) => {
let json_vec: Result<Vec<_>, _> = vec.iter().map(basic_value_to_json).collect();
serde_json::Value::Array(json_vec?)
}
_ => serde_json::Value::String(format!("{:?}", value)),
})
}
impl D1SetupState {
pub fn new(
table_id: &D1TableId,
key_fields: &[FieldSchema],
value_fields: &[FieldSchema],
) -> Result<Self, RecocoError> {
let mut key_columns = vec![];
let mut value_columns = vec![];
let indexes = vec![];
for field in key_fields {
key_columns.push(ColumnSchema {
name: field.name.clone(),
sql_type: value_type_to_sql(&field.value_type.typ),
nullable: field.value_type.nullable,
primary_key: true,
});
}
for field in value_fields {
value_columns.push(ColumnSchema {
name: field.name.clone(),
sql_type: value_type_to_sql(&field.value_type.typ),
nullable: field.value_type.nullable,
primary_key: false,
});
}
Ok(Self {
table_id: table_id.clone(),
key_columns,
value_columns,
indexes,
})
}
pub fn create_table_sql(&self) -> String {
let mut columns = vec![];
for col in self.key_columns.iter().chain(self.value_columns.iter()) {
let mut col_def = format!("{} {}", col.name, col.sql_type);
if !col.nullable {
col_def.push_str(" NOT NULL");
}
columns.push(col_def);
}
if !self.key_columns.is_empty() {
let pk_cols: Vec<_> = self.key_columns.iter().map(|c| &c.name).collect();
columns.push(format!(
"PRIMARY KEY ({})",
pk_cols
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join(", ")
));
}
format!(
"CREATE TABLE IF NOT EXISTS {} ({})",
self.table_id.table_name,
columns.join(", ")
)
}
pub fn create_indexes_sql(&self) -> Vec<String> {
self.indexes
.iter()
.map(|idx| {
let unique = if idx.unique { "UNIQUE " } else { "" };
format!(
"CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
unique,
idx.name,
self.table_id.table_name,
idx.columns.join(", ")
)
})
.collect()
}
}
pub fn value_type_to_sql(value_type: &ValueType) -> String {
match value_type {
ValueType::Basic(BasicValueType::Bool) => "INTEGER".to_string(),
ValueType::Basic(BasicValueType::Int64) => "INTEGER".to_string(),
ValueType::Basic(BasicValueType::Float32 | BasicValueType::Float64) => "REAL".to_string(),
ValueType::Basic(BasicValueType::Str) => "TEXT".to_string(),
ValueType::Basic(BasicValueType::Bytes) => "BLOB".to_string(),
ValueType::Basic(BasicValueType::Json) => "TEXT".to_string(),
_ => "TEXT".to_string(), }
}
#[async_trait]
impl TargetFactoryBase for D1TargetFactory {
type Spec = D1Spec;
type DeclarationSpec = ();
type SetupKey = D1TableId;
type SetupState = D1SetupState;
type SetupChange = D1SetupChange;
type ExportContext = D1ExportContext;
fn name(&self) -> &str {
"d1"
}
async fn build(
self: Arc<Self>,
data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
_declarations: Vec<Self::DeclarationSpec>,
context: Arc<FlowInstanceContext>,
) -> Result<
(
Vec<TypedExportDataCollectionBuildOutput<Self>>,
Vec<(Self::SetupKey, Self::SetupState)>,
),
RecocoError,
> {
use std::time::Duration;
let http_client = Arc::new(
reqwest::Client::builder()
.pool_max_idle_per_host(10) .pool_idle_timeout(Some(Duration::from_secs(90))) .tcp_keepalive(Some(Duration::from_secs(60))) .http2_keep_alive_interval(Some(Duration::from_secs(30))) .timeout(Duration::from_secs(30)) .build()
.map_err(|e| {
RecocoError::internal_msg(format!("Failed to create HTTP client: {}", e))
})?,
);
let mut build_outputs = vec![];
let mut setup_states = vec![];
for collection_spec in data_collections {
let spec = collection_spec.spec.clone();
let table_name = spec.table_name.clone().unwrap_or_else(|| {
format!("{}_{}", context.flow_instance_name, collection_spec.name)
});
let table_id = D1TableId {
database_id: spec.database_id.clone(),
table_name: table_name.clone(),
};
let setup_state = D1SetupState::new(
&table_id,
&collection_spec.key_fields_schema,
&collection_spec.value_fields_schema,
)?;
let database_id = spec.database_id.clone();
let account_id = spec.account_id.clone();
let api_token = spec.api_token.clone();
let key_schema = collection_spec.key_fields_schema.to_vec();
let value_schema = collection_spec.value_fields_schema.clone();
let client = Arc::clone(&http_client);
let export_context = Box::pin(async move {
let metrics = crate::monitoring::performance::PerformanceMetrics::new();
D1ExportContext::new(
database_id,
table_name,
account_id,
api_token,
client,
key_schema,
value_schema,
metrics,
)
.map(Arc::new)
});
build_outputs.push(TypedExportDataCollectionBuildOutput {
setup_key: table_id.clone(),
desired_setup_state: setup_state.clone(),
export_context,
});
setup_states.push((table_id, setup_state));
}
Ok((build_outputs, setup_states))
}
async fn diff_setup_states(
&self,
_key: Self::SetupKey,
desired_state: Option<Self::SetupState>,
existing_states: CombinedState<Self::SetupState>,
_flow_instance_ctx: Arc<FlowInstanceContext>,
) -> Result<Self::SetupChange, RecocoError> {
let desired = desired_state
.ok_or_else(|| RecocoError::client("No desired state provided for D1 table"))?;
let mut change = D1SetupChange {
table_id: desired.table_id.clone(),
create_table_sql: None,
create_indexes_sql: vec![],
alter_table_sql: vec![],
};
if existing_states.staging.is_empty() {
change.create_table_sql = Some(desired.create_table_sql());
change.create_indexes_sql = desired.create_indexes_sql();
return Ok(change);
}
if !existing_states.staging.is_empty() {
change.create_indexes_sql = desired.create_indexes_sql();
}
Ok(change)
}
fn check_state_compatibility(
&self,
desired_state: &Self::SetupState,
existing_state: &Self::SetupState,
) -> Result<SetupStateCompatibility, RecocoError> {
if desired_state.key_columns != existing_state.key_columns
|| desired_state.value_columns != existing_state.value_columns
{
return Ok(SetupStateCompatibility::PartialCompatible);
}
if desired_state.indexes != existing_state.indexes {
return Ok(SetupStateCompatibility::PartialCompatible);
}
Ok(SetupStateCompatibility::Compatible)
}
fn describe_resource(&self, key: &Self::SetupKey) -> Result<String, RecocoError> {
Ok(format!("D1 table: {}.{}", key.database_id, key.table_name))
}
async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
) -> Result<(), RecocoError> {
let mut mutations_by_db: thread_utilities::RapidMap<
String,
Vec<&ExportTargetMutationWithContext<'_, Self::ExportContext>>,
> = thread_utilities::get_map();
for mutation in &mutations {
mutations_by_db
.entry(mutation.export_context.database_id.clone())
.or_default()
.push(mutation);
}
for (_db_id, db_mutations) in mutations_by_db {
for mutation in &db_mutations {
if !mutation.mutation.upserts.is_empty() {
mutation
.export_context
.upsert(&mutation.mutation.upserts)
.await?;
}
}
for mutation in &db_mutations {
if !mutation.mutation.deletes.is_empty() {
mutation
.export_context
.delete(&mutation.mutation.deletes)
.await?;
}
}
}
Ok(())
}
async fn apply_setup_changes(
&self,
changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
_context: Arc<FlowInstanceContext>,
) -> Result<(), RecocoError> {
for change_item in changes {
eprintln!(
"D1 setup changes for {}.{}: {} operations",
change_item.setup_change.table_id.database_id,
change_item.setup_change.table_id.table_name,
change_item.setup_change.create_table_sql.is_some() as usize
+ change_item.setup_change.alter_table_sql.len()
+ change_item.setup_change.create_indexes_sql.len()
);
}
Ok(())
}
}