Skip to main content

thread_flow/targets/
d1.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! D1 Target Factory - Cloudflare D1 distributed SQLite database target
5//!
6//! Implements ReCoco TargetFactoryBase for exporting code analysis results to
7//! Cloudflare D1 edge databases with content-addressed caching.
8
9use async_trait::async_trait;
10use recoco::base::schema::{BasicValueType, FieldSchema, ValueType};
11use recoco::base::value::{BasicValue, FieldValues, KeyValue, Value};
12use recoco::ops::factory_bases::TargetFactoryBase;
13use recoco::ops::interface::{
14    ExportTargetDeleteEntry, ExportTargetMutationWithContext, ExportTargetUpsertEntry,
15    FlowInstanceContext, SetupStateCompatibility,
16};
17use recoco::ops::sdk::{
18    TypedExportDataCollectionBuildOutput, TypedExportDataCollectionSpec,
19    TypedResourceSetupChangeItem,
20};
21use recoco::setup::{ChangeDescription, CombinedState, ResourceSetupChange, SetupChangeType};
22use recoco::utils::prelude::Error as RecocoError;
23use serde::{Deserialize, Serialize};
24use std::fmt::Debug;
25use std::hash::Hash;
26use std::sync::Arc;
27
28#[cfg(feature = "caching")]
29use crate::cache::{CacheConfig, QueryCache};
30
31/// D1 Target Factory for Cloudflare D1 databases
32#[derive(Debug, Clone)]
33pub struct D1TargetFactory;
34
35/// D1 connection specification
36#[derive(Debug, Clone, Deserialize, Serialize)]
37pub struct D1Spec {
38    /// Cloudflare account ID
39    pub account_id: String,
40    /// D1 database ID
41    pub database_id: String,
42    /// API token for authentication
43    pub api_token: String,
44    /// Optional table name override
45    pub table_name: Option<String>,
46}
47
48/// D1 table identifier (SetupKey)
49#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct D1TableId {
51    pub database_id: String,
52    pub table_name: String,
53}
54
55/// D1 table schema state
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct D1SetupState {
58    pub table_id: D1TableId,
59    pub key_columns: Vec<ColumnSchema>,
60    pub value_columns: Vec<ColumnSchema>,
61    pub indexes: Vec<IndexSchema>,
62}
63
64/// Column schema definition
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub struct ColumnSchema {
67    pub name: String,
68    pub sql_type: String,
69    pub nullable: bool,
70    pub primary_key: bool,
71}
72
73/// Index schema definition
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct IndexSchema {
76    pub name: String,
77    pub columns: Vec<String>,
78    pub unique: bool,
79}
80
81/// D1 schema migration instructions (SetupChange)
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct D1SetupChange {
84    pub table_id: D1TableId,
85    pub create_table_sql: Option<String>,
86    pub create_indexes_sql: Vec<String>,
87    pub alter_table_sql: Vec<String>,
88}
89
90impl ResourceSetupChange for D1SetupChange {
91    fn describe_changes(&self) -> Vec<ChangeDescription> {
92        let mut changes = vec![];
93        if let Some(sql) = &self.create_table_sql {
94            changes.push(ChangeDescription::Action(format!("CREATE TABLE: {}", sql)));
95        }
96        for sql in &self.alter_table_sql {
97            changes.push(ChangeDescription::Action(format!("ALTER TABLE: {}", sql)));
98        }
99        for sql in &self.create_indexes_sql {
100            changes.push(ChangeDescription::Action(format!("CREATE INDEX: {}", sql)));
101        }
102        changes
103    }
104
105    fn change_type(&self) -> SetupChangeType {
106        if self.create_table_sql.is_some() {
107            SetupChangeType::Create
108        } else if !self.alter_table_sql.is_empty() || !self.create_indexes_sql.is_empty() {
109            SetupChangeType::Update
110        } else {
111            SetupChangeType::Invalid
112        }
113    }
114}
115
116/// D1 export context (runtime state)
117pub struct D1ExportContext {
118    pub database_id: String,
119    pub table_name: String,
120    pub account_id: String,
121    pub api_token: String,
122    /// Shared HTTP client with connection pooling
123    pub http_client: Arc<reqwest::Client>,
124    pub key_fields_schema: Vec<FieldSchema>,
125    pub value_fields_schema: Vec<FieldSchema>,
126    pub metrics: crate::monitoring::performance::PerformanceMetrics,
127    #[cfg(feature = "caching")]
128    pub query_cache: QueryCache<String, serde_json::Value>,
129}
130
131impl D1ExportContext {
132    /// Create a new D1 export context with a shared HTTP client
133    #[allow(clippy::too_many_arguments)]
134    pub fn new(
135        database_id: String,
136        table_name: String,
137        account_id: String,
138        api_token: String,
139        http_client: Arc<reqwest::Client>,
140        key_fields_schema: Vec<FieldSchema>,
141        value_fields_schema: Vec<FieldSchema>,
142        metrics: crate::monitoring::performance::PerformanceMetrics,
143    ) -> Result<Self, RecocoError> {
144        #[cfg(feature = "caching")]
145        let query_cache = QueryCache::new(CacheConfig {
146            max_capacity: 10_000, // 10k query results
147            ttl_seconds: 300,     // 5 minutes
148        });
149
150        Ok(Self {
151            database_id,
152            table_name,
153            account_id,
154            api_token,
155            http_client,
156            key_fields_schema,
157            value_fields_schema,
158            metrics,
159            #[cfg(feature = "caching")]
160            query_cache,
161        })
162    }
163
164    /// Create a new D1 export context with a default HTTP client (for tests and examples)
165    pub fn new_with_default_client(
166        database_id: String,
167        table_name: String,
168        account_id: String,
169        api_token: String,
170        key_fields_schema: Vec<FieldSchema>,
171        value_fields_schema: Vec<FieldSchema>,
172        metrics: crate::monitoring::performance::PerformanceMetrics,
173    ) -> Result<Self, RecocoError> {
174        use std::time::Duration;
175
176        let http_client = Arc::new(
177            reqwest::Client::builder()
178                .pool_max_idle_per_host(10)
179                .pool_idle_timeout(Some(Duration::from_secs(90)))
180                .tcp_keepalive(Some(Duration::from_secs(60)))
181                .http2_keep_alive_interval(Some(Duration::from_secs(30)))
182                .timeout(Duration::from_secs(30))
183                .build()
184                .map_err(|e| {
185                    RecocoError::internal_msg(format!("Failed to create HTTP client: {}", e))
186                })?,
187        );
188
189        Self::new(
190            database_id,
191            table_name,
192            account_id,
193            api_token,
194            http_client,
195            key_fields_schema,
196            value_fields_schema,
197            metrics,
198        )
199    }
200
201    pub fn api_url(&self) -> String {
202        format!(
203            "https://api.cloudflare.com/client/v4/accounts/{}/d1/database/{}/query",
204            self.account_id, self.database_id
205        )
206    }
207
208    async fn execute_sql(
209        &self,
210        sql: &str,
211        params: Vec<serde_json::Value>,
212    ) -> Result<(), RecocoError> {
213        use std::time::Instant;
214
215        // Generate cache key from SQL + params
216        #[cfg(feature = "caching")]
217        let cache_key = format!("{}{:?}", sql, params);
218
219        // Check cache first (only for caching feature)
220        #[cfg(feature = "caching")]
221        {
222            if let Some(_cached_result) = self.query_cache.get(&cache_key).await {
223                // Cache hit - no need to query D1
224                self.metrics.record_cache_hit();
225                return Ok(());
226            }
227            self.metrics.record_cache_miss();
228        }
229
230        let start = Instant::now();
231
232        let request_body = serde_json::json!({
233            "sql": sql,
234            "params": params
235        });
236
237        let response = self
238            .http_client
239            .post(self.api_url())
240            .header("Authorization", format!("Bearer {}", self.api_token))
241            .header("Content-Type", "application/json")
242            .json(&request_body)
243            .send()
244            .await
245            .map_err(|e| {
246                self.metrics.record_query(start.elapsed(), false);
247                RecocoError::internal_msg(format!("D1 API request failed: {}", e))
248            })?;
249
250        if !response.status().is_success() {
251            let status = response.status();
252            let error_text = response
253                .text()
254                .await
255                .unwrap_or_else(|_| "Unknown error".to_string());
256            self.metrics.record_query(start.elapsed(), false);
257            return Err(RecocoError::internal_msg(format!(
258                "D1 API error ({}): {}",
259                status, error_text
260            )));
261        }
262
263        let result: serde_json::Value = response.json().await.map_err(|e| {
264            self.metrics.record_query(start.elapsed(), false);
265            RecocoError::internal_msg(format!("Failed to parse D1 response: {}", e))
266        })?;
267
268        if !result["success"].as_bool().unwrap_or(false) {
269            let errors = result["errors"].to_string();
270            self.metrics.record_query(start.elapsed(), false);
271            return Err(RecocoError::internal_msg(format!(
272                "D1 execution failed: {}",
273                errors
274            )));
275        }
276
277        self.metrics.record_query(start.elapsed(), true);
278
279        // Cache the successful result
280        #[cfg(feature = "caching")]
281        {
282            self.query_cache.insert(cache_key, result.clone()).await;
283        }
284
285        Ok(())
286    }
287
288    async fn execute_batch(
289        &self,
290        statements: Vec<(String, Vec<serde_json::Value>)>,
291    ) -> Result<(), RecocoError> {
292        for (sql, params) in statements {
293            self.execute_sql(&sql, params).await?;
294        }
295        Ok(())
296    }
297
298    pub fn build_upsert_stmt(
299        &self,
300        key: &KeyValue,
301        values: &FieldValues,
302    ) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
303        let mut columns = vec![];
304        let mut placeholders = vec![];
305        let mut params = vec![];
306        let mut update_clauses = vec![];
307
308        // Extract key parts - KeyValue is a wrapper around Box<[KeyPart]>
309        for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
310            if let Some(key_part) = key.0.get(idx) {
311                columns.push(self.key_fields_schema[idx].name.clone());
312                placeholders.push("?".to_string());
313                params.push(key_part_to_json(key_part)?);
314            }
315        }
316
317        // Add value fields
318        for (idx, value) in values.fields.iter().enumerate() {
319            if let Some(value_field) = self.value_fields_schema.get(idx) {
320                columns.push(value_field.name.clone());
321                placeholders.push("?".to_string());
322                params.push(value_to_json(value)?);
323                update_clauses.push(format!(
324                    "{} = excluded.{}",
325                    value_field.name, value_field.name
326                ));
327            }
328        }
329
330        let sql = format!(
331            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
332            self.table_name,
333            columns.join(", "),
334            placeholders.join(", "),
335            update_clauses.join(", ")
336        );
337
338        Ok((sql, params))
339    }
340
341    pub fn build_delete_stmt(
342        &self,
343        key: &KeyValue,
344    ) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
345        let mut where_clauses = vec![];
346        let mut params = vec![];
347
348        for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
349            if let Some(key_part) = key.0.get(idx) {
350                where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
351                params.push(key_part_to_json(key_part)?);
352            }
353        }
354
355        let sql = format!(
356            "DELETE FROM {} WHERE {}",
357            self.table_name,
358            where_clauses.join(" AND ")
359        );
360
361        Ok((sql, params))
362    }
363
364    pub async fn upsert(&self, upserts: &[ExportTargetUpsertEntry]) -> Result<(), RecocoError> {
365        let statements = upserts
366            .iter()
367            .map(|entry| self.build_upsert_stmt(&entry.key, &entry.value))
368            .collect::<Result<Vec<_>, _>>()?;
369
370        let result = self.execute_batch(statements).await;
371
372        // Invalidate cache on successful mutation
373        #[cfg(feature = "caching")]
374        if result.is_ok() {
375            self.query_cache.clear().await;
376        }
377
378        result
379    }
380
381    pub async fn delete(&self, deletes: &[ExportTargetDeleteEntry]) -> Result<(), RecocoError> {
382        let statements = deletes
383            .iter()
384            .map(|entry| self.build_delete_stmt(&entry.key))
385            .collect::<Result<Vec<_>, _>>()?;
386
387        let result = self.execute_batch(statements).await;
388
389        // Invalidate cache on successful mutation
390        #[cfg(feature = "caching")]
391        if result.is_ok() {
392            self.query_cache.clear().await;
393        }
394
395        result
396    }
397
398    /// Get cache statistics for monitoring
399    #[cfg(feature = "caching")]
400    pub async fn cache_stats(&self) -> crate::cache::CacheStats {
401        self.query_cache.stats().await
402    }
403
404    /// Manually clear the query cache
405    #[cfg(feature = "caching")]
406    pub async fn clear_cache(&self) {
407        self.query_cache.clear().await;
408    }
409}
410
411/// Convert KeyPart to JSON
412/// Made public for testing purposes
413pub fn key_part_to_json(
414    key_part: &recoco::base::value::KeyPart,
415) -> Result<serde_json::Value, RecocoError> {
416    use recoco::base::value::KeyPart;
417
418    Ok(match key_part {
419        KeyPart::Bytes(b) => {
420            use base64::Engine;
421            serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(b))
422        }
423        KeyPart::Str(s) => serde_json::Value::String(s.to_string()),
424        KeyPart::Bool(b) => serde_json::Value::Bool(*b),
425        KeyPart::Int64(i) => serde_json::Value::Number((*i).into()),
426        KeyPart::Range(range) => serde_json::json!([range.start, range.end]),
427        KeyPart::Uuid(uuid) => serde_json::Value::String(uuid.to_string()),
428        KeyPart::Date(date) => serde_json::Value::String(date.to_string()),
429        KeyPart::Struct(parts) => {
430            let json_parts: Result<Vec<_>, _> = parts.iter().map(key_part_to_json).collect();
431            serde_json::Value::Array(json_parts?)
432        }
433    })
434}
435
436/// Convert ReCoco Value to JSON for D1 API
437/// Made public for testing purposes
438pub fn value_to_json(value: &Value) -> Result<serde_json::Value, RecocoError> {
439    Ok(match value {
440        Value::Null => serde_json::Value::Null,
441        Value::Basic(basic) => basic_value_to_json(basic)?,
442        Value::Struct(field_values) => {
443            let fields: Result<Vec<_>, _> = field_values.fields.iter().map(value_to_json).collect();
444            serde_json::Value::Array(fields?)
445        }
446        Value::UTable(items) | Value::LTable(items) => {
447            let json_items: Result<Vec<_>, _> = items
448                .iter()
449                .map(|scope_val| {
450                    // ScopeValue(FieldValues)
451                    let fields: Result<Vec<_>, _> =
452                        scope_val.0.fields.iter().map(value_to_json).collect();
453                    fields.map(serde_json::Value::Array)
454                })
455                .collect();
456            serde_json::Value::Array(json_items?)
457        }
458        Value::KTable(map) => {
459            let mut json_map = serde_json::Map::new();
460            for (key, scope_val) in map {
461                let key_str = format!("{:?}", key); // Simple key representation
462                let fields: Result<Vec<_>, _> =
463                    scope_val.0.fields.iter().map(value_to_json).collect();
464                json_map.insert(key_str, serde_json::Value::Array(fields?));
465            }
466            serde_json::Value::Object(json_map)
467        }
468    })
469}
470
471/// Convert BasicValue to JSON
472/// Made public for testing purposes
473pub fn basic_value_to_json(value: &BasicValue) -> Result<serde_json::Value, RecocoError> {
474    Ok(match value {
475        BasicValue::Bool(b) => serde_json::Value::Bool(*b),
476        BasicValue::Int64(i) => serde_json::Value::Number((*i).into()),
477        BasicValue::Float32(f) => serde_json::Number::from_f64(*f as f64)
478            .map(serde_json::Value::Number)
479            .unwrap_or(serde_json::Value::Null),
480        BasicValue::Float64(f) => serde_json::Number::from_f64(*f)
481            .map(serde_json::Value::Number)
482            .unwrap_or(serde_json::Value::Null),
483        BasicValue::Str(s) => serde_json::Value::String(s.to_string()),
484        BasicValue::Bytes(b) => {
485            use base64::Engine;
486            serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(b))
487        }
488        BasicValue::Json(j) => (**j).clone(),
489        BasicValue::Vector(vec) => {
490            let json_vec: Result<Vec<_>, _> = vec.iter().map(basic_value_to_json).collect();
491            serde_json::Value::Array(json_vec?)
492        }
493        // Handle other BasicValue variants
494        _ => serde_json::Value::String(format!("{:?}", value)),
495    })
496}
497
498impl D1SetupState {
499    pub fn new(
500        table_id: &D1TableId,
501        key_fields: &[FieldSchema],
502        value_fields: &[FieldSchema],
503    ) -> Result<Self, RecocoError> {
504        let mut key_columns = vec![];
505        let mut value_columns = vec![];
506        let indexes = vec![];
507
508        for field in key_fields {
509            key_columns.push(ColumnSchema {
510                name: field.name.clone(),
511                // spellchecker:off
512                sql_type: value_type_to_sql(&field.value_type.typ),
513                // spellchecker:on
514                nullable: field.value_type.nullable,
515                primary_key: true,
516            });
517        }
518
519        for field in value_fields {
520            value_columns.push(ColumnSchema {
521                name: field.name.clone(),
522                // spellchecker:off
523                sql_type: value_type_to_sql(&field.value_type.typ),
524                // spellchecker:on
525                nullable: field.value_type.nullable,
526                primary_key: false,
527            });
528        }
529
530        Ok(Self {
531            table_id: table_id.clone(),
532            key_columns,
533            value_columns,
534            indexes,
535        })
536    }
537
538    pub fn create_table_sql(&self) -> String {
539        let mut columns = vec![];
540
541        for col in self.key_columns.iter().chain(self.value_columns.iter()) {
542            let mut col_def = format!("{} {}", col.name, col.sql_type);
543            if !col.nullable {
544                col_def.push_str(" NOT NULL");
545            }
546            columns.push(col_def);
547        }
548
549        if !self.key_columns.is_empty() {
550            let pk_cols: Vec<_> = self.key_columns.iter().map(|c| &c.name).collect();
551            columns.push(format!(
552                "PRIMARY KEY ({})",
553                pk_cols
554                    .iter()
555                    .map(|s| s.as_str())
556                    .collect::<Vec<_>>()
557                    .join(", ")
558            ));
559        }
560
561        format!(
562            "CREATE TABLE IF NOT EXISTS {} ({})",
563            self.table_id.table_name,
564            columns.join(", ")
565        )
566    }
567
568    pub fn create_indexes_sql(&self) -> Vec<String> {
569        self.indexes
570            .iter()
571            .map(|idx| {
572                let unique = if idx.unique { "UNIQUE " } else { "" };
573                format!(
574                    "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
575                    unique,
576                    idx.name,
577                    self.table_id.table_name,
578                    idx.columns.join(", ")
579                )
580            })
581            .collect()
582    }
583}
584
585/// Map ValueType to SQL type
586/// Made public for testing purposes
587pub fn value_type_to_sql(value_type: &ValueType) -> String {
588    match value_type {
589        ValueType::Basic(BasicValueType::Bool) => "INTEGER".to_string(),
590        ValueType::Basic(BasicValueType::Int64) => "INTEGER".to_string(),
591        ValueType::Basic(BasicValueType::Float32 | BasicValueType::Float64) => "REAL".to_string(),
592        ValueType::Basic(BasicValueType::Str) => "TEXT".to_string(),
593        ValueType::Basic(BasicValueType::Bytes) => "BLOB".to_string(),
594        ValueType::Basic(BasicValueType::Json) => "TEXT".to_string(),
595        _ => "TEXT".to_string(), // Default for complex types
596    }
597}
598
599#[async_trait]
600impl TargetFactoryBase for D1TargetFactory {
601    type Spec = D1Spec;
602    type DeclarationSpec = ();
603    type SetupKey = D1TableId;
604    type SetupState = D1SetupState;
605    type SetupChange = D1SetupChange;
606    type ExportContext = D1ExportContext;
607
608    fn name(&self) -> &str {
609        "d1"
610    }
611
612    async fn build(
613        self: Arc<Self>,
614        data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
615        _declarations: Vec<Self::DeclarationSpec>,
616        context: Arc<FlowInstanceContext>,
617    ) -> Result<
618        (
619            Vec<TypedExportDataCollectionBuildOutput<Self>>,
620            Vec<(Self::SetupKey, Self::SetupState)>,
621        ),
622        RecocoError,
623    > {
624        use std::time::Duration;
625
626        // Create shared HTTP client with connection pooling for all D1 contexts
627        // This ensures efficient connection reuse across all D1 table operations
628        let http_client = Arc::new(
629            reqwest::Client::builder()
630                // Connection pool configuration for Cloudflare D1 API
631                .pool_max_idle_per_host(10) // Max idle connections per host
632                .pool_idle_timeout(Some(Duration::from_secs(90))) // Keep connections warm
633                .tcp_keepalive(Some(Duration::from_secs(60))) // Prevent firewall timeouts
634                .http2_keep_alive_interval(Some(Duration::from_secs(30))) // HTTP/2 keep-alive pings
635                .timeout(Duration::from_secs(30)) // Per-request timeout
636                .build()
637                .map_err(|e| {
638                    RecocoError::internal_msg(format!("Failed to create HTTP client: {}", e))
639                })?,
640        );
641
642        let mut build_outputs = vec![];
643        let mut setup_states = vec![];
644
645        for collection_spec in data_collections {
646            let spec = collection_spec.spec.clone();
647
648            let table_name = spec.table_name.clone().unwrap_or_else(|| {
649                format!("{}_{}", context.flow_instance_name, collection_spec.name)
650            });
651
652            let table_id = D1TableId {
653                database_id: spec.database_id.clone(),
654                table_name: table_name.clone(),
655            };
656
657            let setup_state = D1SetupState::new(
658                &table_id,
659                &collection_spec.key_fields_schema,
660                &collection_spec.value_fields_schema,
661            )?;
662
663            let database_id = spec.database_id.clone();
664            let account_id = spec.account_id.clone();
665            let api_token = spec.api_token.clone();
666            let key_schema = collection_spec.key_fields_schema.to_vec();
667            let value_schema = collection_spec.value_fields_schema.clone();
668            let client = Arc::clone(&http_client);
669
670            let export_context = Box::pin(async move {
671                let metrics = crate::monitoring::performance::PerformanceMetrics::new();
672                D1ExportContext::new(
673                    database_id,
674                    table_name,
675                    account_id,
676                    api_token,
677                    client,
678                    key_schema,
679                    value_schema,
680                    metrics,
681                )
682                .map(Arc::new)
683            });
684
685            build_outputs.push(TypedExportDataCollectionBuildOutput {
686                setup_key: table_id.clone(),
687                desired_setup_state: setup_state.clone(),
688                export_context,
689            });
690
691            setup_states.push((table_id, setup_state));
692        }
693
694        Ok((build_outputs, setup_states))
695    }
696
697    async fn diff_setup_states(
698        &self,
699        _key: Self::SetupKey,
700        desired_state: Option<Self::SetupState>,
701        existing_states: CombinedState<Self::SetupState>,
702        _flow_instance_ctx: Arc<FlowInstanceContext>,
703    ) -> Result<Self::SetupChange, RecocoError> {
704        let desired = desired_state
705            .ok_or_else(|| RecocoError::client("No desired state provided for D1 table"))?;
706
707        let mut change = D1SetupChange {
708            table_id: desired.table_id.clone(),
709            create_table_sql: None,
710            create_indexes_sql: vec![],
711            alter_table_sql: vec![],
712        };
713
714        if existing_states.staging.is_empty() {
715            change.create_table_sql = Some(desired.create_table_sql());
716            change.create_indexes_sql = desired.create_indexes_sql();
717            return Ok(change);
718        }
719
720        if !existing_states.staging.is_empty() {
721            change.create_indexes_sql = desired.create_indexes_sql();
722        }
723
724        Ok(change)
725    }
726
727    fn check_state_compatibility(
728        &self,
729        desired_state: &Self::SetupState,
730        existing_state: &Self::SetupState,
731    ) -> Result<SetupStateCompatibility, RecocoError> {
732        if desired_state.key_columns != existing_state.key_columns
733            || desired_state.value_columns != existing_state.value_columns
734        {
735            return Ok(SetupStateCompatibility::PartialCompatible);
736        }
737
738        if desired_state.indexes != existing_state.indexes {
739            return Ok(SetupStateCompatibility::PartialCompatible);
740        }
741
742        Ok(SetupStateCompatibility::Compatible)
743    }
744
745    fn describe_resource(&self, key: &Self::SetupKey) -> Result<String, RecocoError> {
746        Ok(format!("D1 table: {}.{}", key.database_id, key.table_name))
747    }
748
749    async fn apply_mutation(
750        &self,
751        mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
752    ) -> Result<(), RecocoError> {
753        let mut mutations_by_db: thread_utilities::RapidMap<
754            String,
755            Vec<&ExportTargetMutationWithContext<'_, Self::ExportContext>>,
756        > = thread_utilities::get_map();
757
758        for mutation in &mutations {
759            mutations_by_db
760                .entry(mutation.export_context.database_id.clone())
761                .or_default()
762                .push(mutation);
763        }
764
765        for (_db_id, db_mutations) in mutations_by_db {
766            for mutation in &db_mutations {
767                if !mutation.mutation.upserts.is_empty() {
768                    mutation
769                        .export_context
770                        .upsert(&mutation.mutation.upserts)
771                        .await?;
772                }
773            }
774
775            for mutation in &db_mutations {
776                if !mutation.mutation.deletes.is_empty() {
777                    mutation
778                        .export_context
779                        .delete(&mutation.mutation.deletes)
780                        .await?;
781                }
782            }
783        }
784
785        Ok(())
786    }
787
788    async fn apply_setup_changes(
789        &self,
790        changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
791        _context: Arc<FlowInstanceContext>,
792    ) -> Result<(), RecocoError> {
793        // Note: For D1, we need account_id and api_token which are not in the SetupKey
794        // This is a limitation - setup changes need to be applied manually or through
795        // the same export context used for mutations
796        // For now, we'll skip implementation as it requires additional context
797        // that's not available in this method signature
798
799        // TODO: Store API credentials in a way that's accessible during setup_changes
800        // OR require that setup_changes are only called after build() which creates
801        // the export_context
802
803        for change_item in changes {
804            eprintln!(
805                "D1 setup changes for {}.{}: {} operations",
806                change_item.setup_change.table_id.database_id,
807                change_item.setup_change.table_id.table_name,
808                change_item.setup_change.create_table_sql.is_some() as usize
809                    + change_item.setup_change.alter_table_sql.len()
810                    + change_item.setup_change.create_indexes_sql.len()
811            );
812        }
813
814        Ok(())
815    }
816}