1use async_trait::async_trait;
10use recoco::base::schema::{BasicValueType, FieldSchema, ValueType};
11use recoco::base::value::{BasicValue, FieldValues, KeyValue, Value};
12use recoco::ops::factory_bases::TargetFactoryBase;
13use recoco::ops::interface::{
14 ExportTargetDeleteEntry, ExportTargetMutationWithContext, ExportTargetUpsertEntry,
15 FlowInstanceContext, SetupStateCompatibility,
16};
17use recoco::ops::sdk::{
18 TypedExportDataCollectionBuildOutput, TypedExportDataCollectionSpec,
19 TypedResourceSetupChangeItem,
20};
21use recoco::setup::{ChangeDescription, CombinedState, ResourceSetupChange, SetupChangeType};
22use recoco::utils::prelude::Error as RecocoError;
23use serde::{Deserialize, Serialize};
24use std::fmt::Debug;
25use std::hash::Hash;
26use std::sync::Arc;
27
28#[cfg(feature = "caching")]
29use crate::cache::{CacheConfig, QueryCache};
30
31#[derive(Debug, Clone)]
33pub struct D1TargetFactory;
34
35#[derive(Debug, Clone, Deserialize, Serialize)]
37pub struct D1Spec {
38 pub account_id: String,
40 pub database_id: String,
42 pub api_token: String,
44 pub table_name: Option<String>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct D1TableId {
51 pub database_id: String,
52 pub table_name: String,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct D1SetupState {
58 pub table_id: D1TableId,
59 pub key_columns: Vec<ColumnSchema>,
60 pub value_columns: Vec<ColumnSchema>,
61 pub indexes: Vec<IndexSchema>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub struct ColumnSchema {
67 pub name: String,
68 pub sql_type: String,
69 pub nullable: bool,
70 pub primary_key: bool,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct IndexSchema {
76 pub name: String,
77 pub columns: Vec<String>,
78 pub unique: bool,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct D1SetupChange {
84 pub table_id: D1TableId,
85 pub create_table_sql: Option<String>,
86 pub create_indexes_sql: Vec<String>,
87 pub alter_table_sql: Vec<String>,
88}
89
90impl ResourceSetupChange for D1SetupChange {
91 fn describe_changes(&self) -> Vec<ChangeDescription> {
92 let mut changes = vec![];
93 if let Some(sql) = &self.create_table_sql {
94 changes.push(ChangeDescription::Action(format!("CREATE TABLE: {}", sql)));
95 }
96 for sql in &self.alter_table_sql {
97 changes.push(ChangeDescription::Action(format!("ALTER TABLE: {}", sql)));
98 }
99 for sql in &self.create_indexes_sql {
100 changes.push(ChangeDescription::Action(format!("CREATE INDEX: {}", sql)));
101 }
102 changes
103 }
104
105 fn change_type(&self) -> SetupChangeType {
106 if self.create_table_sql.is_some() {
107 SetupChangeType::Create
108 } else if !self.alter_table_sql.is_empty() || !self.create_indexes_sql.is_empty() {
109 SetupChangeType::Update
110 } else {
111 SetupChangeType::Invalid
112 }
113 }
114}
115
116pub struct D1ExportContext {
118 pub database_id: String,
119 pub table_name: String,
120 pub account_id: String,
121 pub api_token: String,
122 pub http_client: Arc<reqwest::Client>,
124 pub key_fields_schema: Vec<FieldSchema>,
125 pub value_fields_schema: Vec<FieldSchema>,
126 pub metrics: crate::monitoring::performance::PerformanceMetrics,
127 #[cfg(feature = "caching")]
128 pub query_cache: QueryCache<String, serde_json::Value>,
129}
130
131impl D1ExportContext {
132 #[allow(clippy::too_many_arguments)]
134 pub fn new(
135 database_id: String,
136 table_name: String,
137 account_id: String,
138 api_token: String,
139 http_client: Arc<reqwest::Client>,
140 key_fields_schema: Vec<FieldSchema>,
141 value_fields_schema: Vec<FieldSchema>,
142 metrics: crate::monitoring::performance::PerformanceMetrics,
143 ) -> Result<Self, RecocoError> {
144 #[cfg(feature = "caching")]
145 let query_cache = QueryCache::new(CacheConfig {
146 max_capacity: 10_000, ttl_seconds: 300, });
149
150 Ok(Self {
151 database_id,
152 table_name,
153 account_id,
154 api_token,
155 http_client,
156 key_fields_schema,
157 value_fields_schema,
158 metrics,
159 #[cfg(feature = "caching")]
160 query_cache,
161 })
162 }
163
164 pub fn new_with_default_client(
166 database_id: String,
167 table_name: String,
168 account_id: String,
169 api_token: String,
170 key_fields_schema: Vec<FieldSchema>,
171 value_fields_schema: Vec<FieldSchema>,
172 metrics: crate::monitoring::performance::PerformanceMetrics,
173 ) -> Result<Self, RecocoError> {
174 use std::time::Duration;
175
176 let http_client = Arc::new(
177 reqwest::Client::builder()
178 .pool_max_idle_per_host(10)
179 .pool_idle_timeout(Some(Duration::from_secs(90)))
180 .tcp_keepalive(Some(Duration::from_secs(60)))
181 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
182 .timeout(Duration::from_secs(30))
183 .build()
184 .map_err(|e| {
185 RecocoError::internal_msg(format!("Failed to create HTTP client: {}", e))
186 })?,
187 );
188
189 Self::new(
190 database_id,
191 table_name,
192 account_id,
193 api_token,
194 http_client,
195 key_fields_schema,
196 value_fields_schema,
197 metrics,
198 )
199 }
200
201 pub fn api_url(&self) -> String {
202 format!(
203 "https://api.cloudflare.com/client/v4/accounts/{}/d1/database/{}/query",
204 self.account_id, self.database_id
205 )
206 }
207
208 async fn execute_sql(
209 &self,
210 sql: &str,
211 params: Vec<serde_json::Value>,
212 ) -> Result<(), RecocoError> {
213 use std::time::Instant;
214
215 #[cfg(feature = "caching")]
217 let cache_key = format!("{}{:?}", sql, params);
218
219 #[cfg(feature = "caching")]
221 {
222 if let Some(_cached_result) = self.query_cache.get(&cache_key).await {
223 self.metrics.record_cache_hit();
225 return Ok(());
226 }
227 self.metrics.record_cache_miss();
228 }
229
230 let start = Instant::now();
231
232 let request_body = serde_json::json!({
233 "sql": sql,
234 "params": params
235 });
236
237 let response = self
238 .http_client
239 .post(self.api_url())
240 .header("Authorization", format!("Bearer {}", self.api_token))
241 .header("Content-Type", "application/json")
242 .json(&request_body)
243 .send()
244 .await
245 .map_err(|e| {
246 self.metrics.record_query(start.elapsed(), false);
247 RecocoError::internal_msg(format!("D1 API request failed: {}", e))
248 })?;
249
250 if !response.status().is_success() {
251 let status = response.status();
252 let error_text = response
253 .text()
254 .await
255 .unwrap_or_else(|_| "Unknown error".to_string());
256 self.metrics.record_query(start.elapsed(), false);
257 return Err(RecocoError::internal_msg(format!(
258 "D1 API error ({}): {}",
259 status, error_text
260 )));
261 }
262
263 let result: serde_json::Value = response.json().await.map_err(|e| {
264 self.metrics.record_query(start.elapsed(), false);
265 RecocoError::internal_msg(format!("Failed to parse D1 response: {}", e))
266 })?;
267
268 if !result["success"].as_bool().unwrap_or(false) {
269 let errors = result["errors"].to_string();
270 self.metrics.record_query(start.elapsed(), false);
271 return Err(RecocoError::internal_msg(format!(
272 "D1 execution failed: {}",
273 errors
274 )));
275 }
276
277 self.metrics.record_query(start.elapsed(), true);
278
279 #[cfg(feature = "caching")]
281 {
282 self.query_cache.insert(cache_key, result.clone()).await;
283 }
284
285 Ok(())
286 }
287
288 async fn execute_batch(
289 &self,
290 statements: Vec<(String, Vec<serde_json::Value>)>,
291 ) -> Result<(), RecocoError> {
292 for (sql, params) in statements {
293 self.execute_sql(&sql, params).await?;
294 }
295 Ok(())
296 }
297
298 pub fn build_upsert_stmt(
299 &self,
300 key: &KeyValue,
301 values: &FieldValues,
302 ) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
303 let mut columns = vec![];
304 let mut placeholders = vec![];
305 let mut params = vec![];
306 let mut update_clauses = vec![];
307
308 for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
310 if let Some(key_part) = key.0.get(idx) {
311 columns.push(self.key_fields_schema[idx].name.clone());
312 placeholders.push("?".to_string());
313 params.push(key_part_to_json(key_part)?);
314 }
315 }
316
317 for (idx, value) in values.fields.iter().enumerate() {
319 if let Some(value_field) = self.value_fields_schema.get(idx) {
320 columns.push(value_field.name.clone());
321 placeholders.push("?".to_string());
322 params.push(value_to_json(value)?);
323 update_clauses.push(format!(
324 "{} = excluded.{}",
325 value_field.name, value_field.name
326 ));
327 }
328 }
329
330 let sql = format!(
331 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
332 self.table_name,
333 columns.join(", "),
334 placeholders.join(", "),
335 update_clauses.join(", ")
336 );
337
338 Ok((sql, params))
339 }
340
341 pub fn build_delete_stmt(
342 &self,
343 key: &KeyValue,
344 ) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
345 let mut where_clauses = vec![];
346 let mut params = vec![];
347
348 for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
349 if let Some(key_part) = key.0.get(idx) {
350 where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
351 params.push(key_part_to_json(key_part)?);
352 }
353 }
354
355 let sql = format!(
356 "DELETE FROM {} WHERE {}",
357 self.table_name,
358 where_clauses.join(" AND ")
359 );
360
361 Ok((sql, params))
362 }
363
364 pub async fn upsert(&self, upserts: &[ExportTargetUpsertEntry]) -> Result<(), RecocoError> {
365 let statements = upserts
366 .iter()
367 .map(|entry| self.build_upsert_stmt(&entry.key, &entry.value))
368 .collect::<Result<Vec<_>, _>>()?;
369
370 let result = self.execute_batch(statements).await;
371
372 #[cfg(feature = "caching")]
374 if result.is_ok() {
375 self.query_cache.clear().await;
376 }
377
378 result
379 }
380
381 pub async fn delete(&self, deletes: &[ExportTargetDeleteEntry]) -> Result<(), RecocoError> {
382 let statements = deletes
383 .iter()
384 .map(|entry| self.build_delete_stmt(&entry.key))
385 .collect::<Result<Vec<_>, _>>()?;
386
387 let result = self.execute_batch(statements).await;
388
389 #[cfg(feature = "caching")]
391 if result.is_ok() {
392 self.query_cache.clear().await;
393 }
394
395 result
396 }
397
398 #[cfg(feature = "caching")]
400 pub async fn cache_stats(&self) -> crate::cache::CacheStats {
401 self.query_cache.stats().await
402 }
403
404 #[cfg(feature = "caching")]
406 pub async fn clear_cache(&self) {
407 self.query_cache.clear().await;
408 }
409}
410
411pub fn key_part_to_json(
414 key_part: &recoco::base::value::KeyPart,
415) -> Result<serde_json::Value, RecocoError> {
416 use recoco::base::value::KeyPart;
417
418 Ok(match key_part {
419 KeyPart::Bytes(b) => {
420 use base64::Engine;
421 serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(b))
422 }
423 KeyPart::Str(s) => serde_json::Value::String(s.to_string()),
424 KeyPart::Bool(b) => serde_json::Value::Bool(*b),
425 KeyPart::Int64(i) => serde_json::Value::Number((*i).into()),
426 KeyPart::Range(range) => serde_json::json!([range.start, range.end]),
427 KeyPart::Uuid(uuid) => serde_json::Value::String(uuid.to_string()),
428 KeyPart::Date(date) => serde_json::Value::String(date.to_string()),
429 KeyPart::Struct(parts) => {
430 let json_parts: Result<Vec<_>, _> = parts.iter().map(key_part_to_json).collect();
431 serde_json::Value::Array(json_parts?)
432 }
433 })
434}
435
436pub fn value_to_json(value: &Value) -> Result<serde_json::Value, RecocoError> {
439 Ok(match value {
440 Value::Null => serde_json::Value::Null,
441 Value::Basic(basic) => basic_value_to_json(basic)?,
442 Value::Struct(field_values) => {
443 let fields: Result<Vec<_>, _> = field_values.fields.iter().map(value_to_json).collect();
444 serde_json::Value::Array(fields?)
445 }
446 Value::UTable(items) | Value::LTable(items) => {
447 let json_items: Result<Vec<_>, _> = items
448 .iter()
449 .map(|scope_val| {
450 let fields: Result<Vec<_>, _> =
452 scope_val.0.fields.iter().map(value_to_json).collect();
453 fields.map(serde_json::Value::Array)
454 })
455 .collect();
456 serde_json::Value::Array(json_items?)
457 }
458 Value::KTable(map) => {
459 let mut json_map = serde_json::Map::new();
460 for (key, scope_val) in map {
461 let key_str = format!("{:?}", key); let fields: Result<Vec<_>, _> =
463 scope_val.0.fields.iter().map(value_to_json).collect();
464 json_map.insert(key_str, serde_json::Value::Array(fields?));
465 }
466 serde_json::Value::Object(json_map)
467 }
468 })
469}
470
471pub fn basic_value_to_json(value: &BasicValue) -> Result<serde_json::Value, RecocoError> {
474 Ok(match value {
475 BasicValue::Bool(b) => serde_json::Value::Bool(*b),
476 BasicValue::Int64(i) => serde_json::Value::Number((*i).into()),
477 BasicValue::Float32(f) => serde_json::Number::from_f64(*f as f64)
478 .map(serde_json::Value::Number)
479 .unwrap_or(serde_json::Value::Null),
480 BasicValue::Float64(f) => serde_json::Number::from_f64(*f)
481 .map(serde_json::Value::Number)
482 .unwrap_or(serde_json::Value::Null),
483 BasicValue::Str(s) => serde_json::Value::String(s.to_string()),
484 BasicValue::Bytes(b) => {
485 use base64::Engine;
486 serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(b))
487 }
488 BasicValue::Json(j) => (**j).clone(),
489 BasicValue::Vector(vec) => {
490 let json_vec: Result<Vec<_>, _> = vec.iter().map(basic_value_to_json).collect();
491 serde_json::Value::Array(json_vec?)
492 }
493 _ => serde_json::Value::String(format!("{:?}", value)),
495 })
496}
497
498impl D1SetupState {
499 pub fn new(
500 table_id: &D1TableId,
501 key_fields: &[FieldSchema],
502 value_fields: &[FieldSchema],
503 ) -> Result<Self, RecocoError> {
504 let mut key_columns = vec![];
505 let mut value_columns = vec![];
506 let indexes = vec![];
507
508 for field in key_fields {
509 key_columns.push(ColumnSchema {
510 name: field.name.clone(),
511 sql_type: value_type_to_sql(&field.value_type.typ),
513 nullable: field.value_type.nullable,
515 primary_key: true,
516 });
517 }
518
519 for field in value_fields {
520 value_columns.push(ColumnSchema {
521 name: field.name.clone(),
522 sql_type: value_type_to_sql(&field.value_type.typ),
524 nullable: field.value_type.nullable,
526 primary_key: false,
527 });
528 }
529
530 Ok(Self {
531 table_id: table_id.clone(),
532 key_columns,
533 value_columns,
534 indexes,
535 })
536 }
537
538 pub fn create_table_sql(&self) -> String {
539 let mut columns = vec![];
540
541 for col in self.key_columns.iter().chain(self.value_columns.iter()) {
542 let mut col_def = format!("{} {}", col.name, col.sql_type);
543 if !col.nullable {
544 col_def.push_str(" NOT NULL");
545 }
546 columns.push(col_def);
547 }
548
549 if !self.key_columns.is_empty() {
550 let pk_cols: Vec<_> = self.key_columns.iter().map(|c| &c.name).collect();
551 columns.push(format!(
552 "PRIMARY KEY ({})",
553 pk_cols
554 .iter()
555 .map(|s| s.as_str())
556 .collect::<Vec<_>>()
557 .join(", ")
558 ));
559 }
560
561 format!(
562 "CREATE TABLE IF NOT EXISTS {} ({})",
563 self.table_id.table_name,
564 columns.join(", ")
565 )
566 }
567
568 pub fn create_indexes_sql(&self) -> Vec<String> {
569 self.indexes
570 .iter()
571 .map(|idx| {
572 let unique = if idx.unique { "UNIQUE " } else { "" };
573 format!(
574 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
575 unique,
576 idx.name,
577 self.table_id.table_name,
578 idx.columns.join(", ")
579 )
580 })
581 .collect()
582 }
583}
584
585pub fn value_type_to_sql(value_type: &ValueType) -> String {
588 match value_type {
589 ValueType::Basic(BasicValueType::Bool) => "INTEGER".to_string(),
590 ValueType::Basic(BasicValueType::Int64) => "INTEGER".to_string(),
591 ValueType::Basic(BasicValueType::Float32 | BasicValueType::Float64) => "REAL".to_string(),
592 ValueType::Basic(BasicValueType::Str) => "TEXT".to_string(),
593 ValueType::Basic(BasicValueType::Bytes) => "BLOB".to_string(),
594 ValueType::Basic(BasicValueType::Json) => "TEXT".to_string(),
595 _ => "TEXT".to_string(), }
597}
598
599#[async_trait]
600impl TargetFactoryBase for D1TargetFactory {
601 type Spec = D1Spec;
602 type DeclarationSpec = ();
603 type SetupKey = D1TableId;
604 type SetupState = D1SetupState;
605 type SetupChange = D1SetupChange;
606 type ExportContext = D1ExportContext;
607
608 fn name(&self) -> &str {
609 "d1"
610 }
611
612 async fn build(
613 self: Arc<Self>,
614 data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
615 _declarations: Vec<Self::DeclarationSpec>,
616 context: Arc<FlowInstanceContext>,
617 ) -> Result<
618 (
619 Vec<TypedExportDataCollectionBuildOutput<Self>>,
620 Vec<(Self::SetupKey, Self::SetupState)>,
621 ),
622 RecocoError,
623 > {
624 use std::time::Duration;
625
626 let http_client = Arc::new(
629 reqwest::Client::builder()
630 .pool_max_idle_per_host(10) .pool_idle_timeout(Some(Duration::from_secs(90))) .tcp_keepalive(Some(Duration::from_secs(60))) .http2_keep_alive_interval(Some(Duration::from_secs(30))) .timeout(Duration::from_secs(30)) .build()
637 .map_err(|e| {
638 RecocoError::internal_msg(format!("Failed to create HTTP client: {}", e))
639 })?,
640 );
641
642 let mut build_outputs = vec![];
643 let mut setup_states = vec![];
644
645 for collection_spec in data_collections {
646 let spec = collection_spec.spec.clone();
647
648 let table_name = spec.table_name.clone().unwrap_or_else(|| {
649 format!("{}_{}", context.flow_instance_name, collection_spec.name)
650 });
651
652 let table_id = D1TableId {
653 database_id: spec.database_id.clone(),
654 table_name: table_name.clone(),
655 };
656
657 let setup_state = D1SetupState::new(
658 &table_id,
659 &collection_spec.key_fields_schema,
660 &collection_spec.value_fields_schema,
661 )?;
662
663 let database_id = spec.database_id.clone();
664 let account_id = spec.account_id.clone();
665 let api_token = spec.api_token.clone();
666 let key_schema = collection_spec.key_fields_schema.to_vec();
667 let value_schema = collection_spec.value_fields_schema.clone();
668 let client = Arc::clone(&http_client);
669
670 let export_context = Box::pin(async move {
671 let metrics = crate::monitoring::performance::PerformanceMetrics::new();
672 D1ExportContext::new(
673 database_id,
674 table_name,
675 account_id,
676 api_token,
677 client,
678 key_schema,
679 value_schema,
680 metrics,
681 )
682 .map(Arc::new)
683 });
684
685 build_outputs.push(TypedExportDataCollectionBuildOutput {
686 setup_key: table_id.clone(),
687 desired_setup_state: setup_state.clone(),
688 export_context,
689 });
690
691 setup_states.push((table_id, setup_state));
692 }
693
694 Ok((build_outputs, setup_states))
695 }
696
697 async fn diff_setup_states(
698 &self,
699 _key: Self::SetupKey,
700 desired_state: Option<Self::SetupState>,
701 existing_states: CombinedState<Self::SetupState>,
702 _flow_instance_ctx: Arc<FlowInstanceContext>,
703 ) -> Result<Self::SetupChange, RecocoError> {
704 let desired = desired_state
705 .ok_or_else(|| RecocoError::client("No desired state provided for D1 table"))?;
706
707 let mut change = D1SetupChange {
708 table_id: desired.table_id.clone(),
709 create_table_sql: None,
710 create_indexes_sql: vec![],
711 alter_table_sql: vec![],
712 };
713
714 if existing_states.staging.is_empty() {
715 change.create_table_sql = Some(desired.create_table_sql());
716 change.create_indexes_sql = desired.create_indexes_sql();
717 return Ok(change);
718 }
719
720 if !existing_states.staging.is_empty() {
721 change.create_indexes_sql = desired.create_indexes_sql();
722 }
723
724 Ok(change)
725 }
726
727 fn check_state_compatibility(
728 &self,
729 desired_state: &Self::SetupState,
730 existing_state: &Self::SetupState,
731 ) -> Result<SetupStateCompatibility, RecocoError> {
732 if desired_state.key_columns != existing_state.key_columns
733 || desired_state.value_columns != existing_state.value_columns
734 {
735 return Ok(SetupStateCompatibility::PartialCompatible);
736 }
737
738 if desired_state.indexes != existing_state.indexes {
739 return Ok(SetupStateCompatibility::PartialCompatible);
740 }
741
742 Ok(SetupStateCompatibility::Compatible)
743 }
744
745 fn describe_resource(&self, key: &Self::SetupKey) -> Result<String, RecocoError> {
746 Ok(format!("D1 table: {}.{}", key.database_id, key.table_name))
747 }
748
749 async fn apply_mutation(
750 &self,
751 mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
752 ) -> Result<(), RecocoError> {
753 let mut mutations_by_db: thread_utilities::RapidMap<
754 String,
755 Vec<&ExportTargetMutationWithContext<'_, Self::ExportContext>>,
756 > = thread_utilities::get_map();
757
758 for mutation in &mutations {
759 mutations_by_db
760 .entry(mutation.export_context.database_id.clone())
761 .or_default()
762 .push(mutation);
763 }
764
765 for (_db_id, db_mutations) in mutations_by_db {
766 for mutation in &db_mutations {
767 if !mutation.mutation.upserts.is_empty() {
768 mutation
769 .export_context
770 .upsert(&mutation.mutation.upserts)
771 .await?;
772 }
773 }
774
775 for mutation in &db_mutations {
776 if !mutation.mutation.deletes.is_empty() {
777 mutation
778 .export_context
779 .delete(&mutation.mutation.deletes)
780 .await?;
781 }
782 }
783 }
784
785 Ok(())
786 }
787
788 async fn apply_setup_changes(
789 &self,
790 changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
791 _context: Arc<FlowInstanceContext>,
792 ) -> Result<(), RecocoError> {
793 for change_item in changes {
804 eprintln!(
805 "D1 setup changes for {}.{}: {} operations",
806 change_item.setup_change.table_id.database_id,
807 change_item.setup_change.table_id.table_name,
808 change_item.setup_change.create_table_sql.is_some() as usize
809 + change_item.setup_change.alter_table_sql.len()
810 + change_item.setup_change.create_indexes_sql.len()
811 );
812 }
813
814 Ok(())
815 }
816}