hyprstream_core/storage/
duckdb.rs

1//! DuckDB storage backend implementation.
2//!
3//! This module provides a high-performance storage backend using DuckDB,
4//! an embedded analytical database. The implementation supports:
5//! - In-memory and persistent storage options
6//! - Efficient batch operations
7//! - SQL query capabilities
8//! - Time-based filtering
9//!
10//! # Configuration
11//!
12//! The DuckDB backend can be configured using the following options:
13//!
14//! ```toml
15//! [engine]
16//! engine = "duckdb"
17//! connection = ":memory:"  # Use ":memory:" for in-memory or file path
18//! options = {
19//!     threads = "4",      # Optional: Number of threads (default: 4)
20//!     read_only = "false" # Optional: Read-only mode (default: false)
21//! }
22//! ```
23//!
24//! Or via command line:
25//!
26//! ```bash
27//! hyprstream \
28//!   --engine duckdb \
29//!   --engine-connection ":memory:" \
30//!   --engine-options threads=4 \
31//!   --engine-options read_only=false
32//! ```
33//!
34//! DuckDB is particularly well-suited for analytics workloads and
35//! provides excellent performance for both caching and primary storage.
36
37use std::collections::HashMap;
38use std::sync::Arc;
39use duckdb::{Connection, Config, params, ToSql};
40use tokio::sync::Mutex;
41use tonic::Status;
42use crate::metrics::MetricRecord;
43use crate::config::Credentials;
44use crate::storage::{StorageBackend, BatchAggregation};
45use crate::storage::cache::{CacheManager, CacheEviction};
46use crate::storage::table_manager::{TableManager, AggregationView};
47use crate::aggregation::{TimeWindow, AggregateFunction, GroupBy, AggregateResult, build_aggregate_query};
48use async_trait::async_trait;
49use arrow::datatypes::{DataType, Field, Schema};
50use arrow::array::{
51    Array, ArrayRef, RecordBatch, Int64Array, Float64Array, StringArray,
52};
53use arrow::array::builder::{
54    ArrayBuilder, Int64Builder, Float64Builder, StringBuilder,
55};
56use std::time::Duration;
57
58/// DuckDB-based storage backend for metrics.
59#[derive(Clone)]
60pub struct DuckDbBackend {
61    conn: Arc<Mutex<Connection>>,
62    connection_string: String,
63    options: HashMap<String, String>,
64    cache_manager: CacheManager,
65    table_manager: TableManager,
66}
67
68impl DuckDbBackend {
69    /// Creates a new DuckDB backend instance.
70    pub fn new(connection_string: String, options: HashMap<String, String>, ttl: Option<u64>) -> Result<Self, Status> {
71        let config = Config::default();
72        let conn = Connection::open_with_flags(&connection_string, config)
73            .map_err(|e| Status::internal(e.to_string()))?;
74
75        let backend = Self {
76            conn: Arc::new(Mutex::new(conn)),
77            connection_string,
78            options,
79            cache_manager: CacheManager::new(ttl),
80            table_manager: TableManager::new(),
81        };
82
83        // Initialize tables
84        let backend_clone = backend.clone();
85        tokio::spawn(async move {
86            if let Err(e) = backend_clone.init().await {
87                tracing::error!("Failed to initialize tables: {}", e);
88            }
89        });
90
91        Ok(backend)
92    }
93
94    /// Creates a new DuckDB backend with an in-memory database.
95    pub fn new_in_memory() -> Result<Self, Status> {
96        Self::new(":memory:".to_string(), HashMap::new(), Some(0))
97    }
98
99    /// Inserts a batch of metrics with optimized aggregation updates.
100    async fn insert_batch_optimized(&self, metrics: &[MetricRecord], window: TimeWindow) -> Result<(), Status> {
101        let conn = self.conn.lock().await;
102        
103        // Begin transaction
104        conn.execute("BEGIN TRANSACTION", params![])
105            .map_err(|e| Status::internal(format!("Failed to begin transaction: {}", e)))?;
106
107        // Convert metrics to RecordBatch for efficient insertion
108        let batch = Self::prepare_params(metrics)?;
109
110        // Insert metrics using prepared statement
111        let mut stmt = conn.prepare(r#"
112            INSERT INTO metrics (
113                metric_id,
114                timestamp,
115                value_running_window_sum,
116                value_running_window_avg,
117                value_running_window_count
118            ) VALUES (?, ?, ?, ?, ?)
119        "#).map_err(|e| Status::internal(format!("Failed to prepare statement: {}", e)))?;
120
121        // Bind and execute in batches
122        for i in 0..batch.num_rows() {
123            let metric_id = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap().value(i);
124            let timestamp = batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().value(i);
125            let sum = batch.column(2).as_any().downcast_ref::<Float64Array>().unwrap().value(i);
126            let avg = batch.column(3).as_any().downcast_ref::<Float64Array>().unwrap().value(i);
127            let count = batch.column(4).as_any().downcast_ref::<Int64Array>().unwrap().value(i);
128
129            stmt.execute(params![
130                metric_id,
131                timestamp,
132                sum,
133                avg,
134                count,
135            ]).map_err(|e| Status::internal(format!("Failed to insert metrics: {}", e)))?;
136        }
137
138        // Update aggregations based on window
139        let window_start = match window {
140            TimeWindow::Sliding { window, slide: _ } => {
141                let now = metrics.iter().map(|m| m.timestamp).max().unwrap_or(0);
142                now - window.as_nanos() as i64
143            }
144            TimeWindow::Fixed(start) => start.as_nanos() as i64,
145            TimeWindow::None => metrics.iter().map(|m| m.timestamp).min().unwrap_or(0),
146        };
147
148        let window_end = match window {
149            TimeWindow::Sliding { window: _, slide: _ } => {
150                metrics.iter().map(|m| m.timestamp).max().unwrap_or(0)
151            }
152            TimeWindow::Fixed(end) => end.as_nanos() as i64,
153            TimeWindow::None => metrics.iter().map(|m| m.timestamp).max().unwrap_or(0),
154        };
155
156        // Group metrics by ID and calculate aggregations
157        let mut aggregations = HashMap::new();
158        for metric in metrics {
159            let entry = aggregations.entry(metric.metric_id.clone()).or_insert_with(|| BatchAggregation {
160                metric_id: metric.metric_id.clone(),
161                window_start,
162                window_end,
163                running_sum: 0.0,
164                running_count: 0,
165                min_value: f64::INFINITY,
166                max_value: f64::NEG_INFINITY,
167            });
168
169            entry.running_sum += metric.value_running_window_sum;
170            entry.running_count += metric.value_running_window_count as i64;
171            entry.min_value = entry.min_value.min(metric.value_running_window_sum);
172            entry.max_value = entry.max_value.max(metric.value_running_window_sum);
173        }
174
175        // Update aggregations table using prepared statement with proper type handling
176        let mut agg_stmt = conn.prepare(r#"
177            INSERT INTO metric_aggregations (
178                metric_id, window_start, window_end,
179                running_sum, running_count, min_value, max_value
180            ) VALUES (?, ?, ?, ?, ?, ?, ?)
181            ON CONFLICT (metric_id, window_start, window_end) DO UPDATE
182            SET running_sum = metric_aggregations.running_sum + EXCLUDED.running_sum,
183                running_count = metric_aggregations.running_count + EXCLUDED.running_count,
184                min_value = LEAST(metric_aggregations.min_value, EXCLUDED.min_value),
185                max_value = GREATEST(metric_aggregations.max_value, EXCLUDED.max_value)
186        "#).map_err(|e| Status::internal(format!("Failed to prepare aggregation statement: {}", e)))?;
187
188        for agg in aggregations.values() {
189            agg_stmt.execute(params![
190                agg.metric_id,
191                agg.window_start,
192                agg.window_end,
193                agg.running_sum,
194                agg.running_count,
195                agg.min_value,
196                agg.max_value,
197            ]).map_err(|e| Status::internal(format!("Failed to update aggregations: {}", e)))?;
198        }
199
200        // Commit transaction
201        conn.execute("COMMIT", params![])
202            .map_err(|e| Status::internal(format!("Failed to commit transaction: {}", e)))?;
203
204        Ok(())
205    }
206
207    /// Prepares parameters for batch insertion
208    fn prepare_params(metrics: &[MetricRecord]) -> Result<RecordBatch, Status> {
209        let schema = Arc::new(Schema::new(vec![
210            Field::new("metric_id", DataType::Utf8, false),
211            Field::new("timestamp", DataType::Int64, false),
212            Field::new("value_running_window_sum", DataType::Float64, false),
213            Field::new("value_running_window_avg", DataType::Float64, false),
214            Field::new("value_running_window_count", DataType::Int64, false),
215        ]));
216
217        let metric_ids = StringArray::from_iter_values(metrics.iter().map(|m| m.metric_id.as_str()));
218        let timestamps = Int64Array::from_iter_values(metrics.iter().map(|m| m.timestamp));
219        let sums = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_sum));
220        let avgs = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_avg));
221        let counts = Int64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_count));
222
223        let arrays: Vec<ArrayRef> = vec![
224            Arc::new(metric_ids),
225            Arc::new(timestamps),
226            Arc::new(sums),
227            Arc::new(avgs),
228            Arc::new(counts),
229        ];
230
231        RecordBatch::try_new(schema, arrays)
232            .map_err(|e| Status::internal(format!("Failed to create parameter batch: {}", e)))
233    }
234
235    /// Creates the necessary tables for metric storage and aggregation.
236    async fn create_tables(&self) -> Result<(), Status> {
237        let conn = self.conn.lock().await;
238
239        // Create metrics table
240        conn.execute(r#"
241            CREATE TABLE IF NOT EXISTS metrics (
242                metric_id VARCHAR NOT NULL,
243                timestamp BIGINT NOT NULL,
244                value_running_window_sum DOUBLE NOT NULL,
245                value_running_window_avg DOUBLE NOT NULL,
246                value_running_window_count BIGINT NOT NULL,
247                PRIMARY KEY (metric_id, timestamp)
248            )
249        "#, params![]).map_err(|e| Status::internal(e.to_string()))?;
250
251        // Create index for time-based queries
252        conn.execute(
253            "CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp)",
254            params![]
255        ).map_err(|e| Status::internal(e.to_string()))?;
256
257        // Create table for batch-level aggregations
258        conn.execute(r#"
259            CREATE TABLE IF NOT EXISTS metric_aggregations (
260                metric_id VARCHAR NOT NULL,
261                window_start BIGINT NOT NULL,
262                window_end BIGINT NOT NULL,
263                running_sum DOUBLE NOT NULL,
264                running_count BIGINT NOT NULL,
265                min_value DOUBLE NOT NULL,
266                max_value DOUBLE NOT NULL,
267                PRIMARY KEY (metric_id, window_start, window_end)
268            )
269        "#, params![]).map_err(|e| Status::internal(e.to_string()))?;
270
271        // Create index for window-based queries
272        conn.execute(
273            "CREATE INDEX IF NOT EXISTS idx_aggregations_window ON metric_aggregations(window_start, window_end)",
274            params![]
275        ).map_err(|e| Status::internal(e.to_string()))?;
276
277        Ok(())
278    }
279}
280
281#[async_trait]
282impl CacheEviction for DuckDbBackend {
283    async fn execute_eviction(&self, query: &str) -> Result<(), Status> {
284        let conn = self.conn.clone();
285        let query = query.to_string();
286        tokio::spawn(async move {
287            let conn_guard = conn.lock().await;
288            if let Err(e) = conn_guard.execute_batch(&query) {
289                tracing::error!("Background eviction error: {}", e);
290            }
291        });
292        Ok(())
293    }
294}
295
296#[async_trait]
297impl StorageBackend for DuckDbBackend {
298    async fn init(&self) -> Result<(), Status> {
299        let conn = self.conn.lock().await;
300        
301        // Create metrics table with optimized schema
302        conn.execute_batch(r#"
303            CREATE TABLE IF NOT EXISTS metrics (
304                metric_id VARCHAR NOT NULL,
305                timestamp BIGINT NOT NULL,
306                value_running_window_sum DOUBLE NOT NULL,
307                value_running_window_avg DOUBLE NOT NULL,
308                value_running_window_count BIGINT NOT NULL,
309                PRIMARY KEY (metric_id, timestamp)
310            );
311
312            CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
313
314            CREATE TABLE IF NOT EXISTS metric_aggregations (
315                metric_id VARCHAR NOT NULL,
316                window_start BIGINT NOT NULL,
317                window_end BIGINT NOT NULL,
318                running_sum DOUBLE NOT NULL,
319                running_count BIGINT NOT NULL,
320                min_value DOUBLE NOT NULL,
321                max_value DOUBLE NOT NULL,
322                PRIMARY KEY (metric_id, window_start, window_end)
323            );
324
325            CREATE INDEX IF NOT EXISTS idx_aggregations_window 
326            ON metric_aggregations(window_start, window_end);
327        "#).map_err(|e| Status::internal(format!("Failed to create tables: {}", e)))?;
328
329        Ok(())
330    }
331
332    async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status> {
333        if metrics.is_empty() {
334            return Ok(());
335        }
336
337        // Check if eviction is needed
338        if let Some(cutoff) = self.cache_manager.should_evict().await? {
339            let query = self.cache_manager.eviction_query(cutoff);
340            self.execute_eviction(&query).await?;
341        }
342
343        // Use sliding window for batch-level aggregations
344        let window = TimeWindow::Sliding {
345            window: Duration::from_secs(3600), // 1 hour window
346            slide: Duration::from_secs(60),    // 1 minute slide
347        };
348
349        // Use optimized batch insertion
350        self.insert_batch_optimized(&metrics, window).await
351    }
352
353    async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status> {
354        // Check if eviction is needed
355        if let Some(cutoff) = self.cache_manager.should_evict().await? {
356            let query = self.cache_manager.eviction_query(cutoff);
357            self.execute_eviction(&query).await?;
358        }
359
360        let query = format!(
361            "SELECT metric_id, timestamp, value_running_window_sum, value_running_window_avg, value_running_window_count \
362             FROM metrics WHERE timestamp >= {} ORDER BY timestamp ASC",
363            from_timestamp
364        );
365
366        let conn = self.conn.lock().await;
367        let mut stmt = conn.prepare(&query)
368            .map_err(|e| Status::internal(e.to_string()))?;
369
370        let mut rows = stmt.query(params![])
371            .map_err(|e| Status::internal(e.to_string()))?;
372
373        let mut metrics = Vec::new();
374        while let Some(row) = rows.next().map_err(|e| Status::internal(e.to_string()))? {
375            metrics.push(MetricRecord {
376                metric_id: row.get(0).map_err(|e| Status::internal(e.to_string()))?,
377                timestamp: row.get(1).map_err(|e| Status::internal(e.to_string()))?,
378                value_running_window_sum: row.get(2).map_err(|e| Status::internal(e.to_string()))?,
379                value_running_window_avg: row.get(3).map_err(|e| Status::internal(e.to_string()))?,
380                value_running_window_count: row.get(4).map_err(|e| Status::internal(e.to_string()))?,
381            });
382        }
383
384        Ok(metrics)
385    }
386
387    async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status> {
388        Ok(query.as_bytes().to_vec())
389    }
390
391    async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status> {
392        let sql = std::str::from_utf8(statement_handle)
393            .map_err(|e| Status::internal(e.to_string()))?;
394        self.query_metrics(sql.parse().unwrap_or(0)).await
395    }
396
397    async fn aggregate_metrics(
398        &self,
399        function: AggregateFunction,
400        group_by: &GroupBy,
401        from_timestamp: i64,
402        to_timestamp: Option<i64>,
403    ) -> Result<Vec<AggregateResult>, Status> {
404        // Check if eviction is needed
405        if let Some(cutoff) = self.cache_manager.should_evict().await? {
406            let query = self.cache_manager.eviction_query(cutoff);
407            self.execute_eviction(&query).await?;
408        }
409
410        let query = build_aggregate_query(
411            "metrics",
412            function,
413            group_by,
414            &["value_running_window_sum"],
415            Some(from_timestamp),
416            to_timestamp,
417        );
418
419        let conn = self.conn.lock().await;
420        let mut stmt = conn.prepare(&query)
421            .map_err(|e| Status::internal(e.to_string()))?;
422
423        let mut rows = stmt.query(params![])
424            .map_err(|e| Status::internal(e.to_string()))?;
425
426        let mut results = Vec::new();
427        while let Some(row) = rows.next().map_err(|e| Status::internal(e.to_string()))? {
428            let value: f64 = row.get(0).map_err(|e| Status::internal(e.to_string()))?;
429            let timestamp: i64 = row.get(1).map_err(|e| Status::internal(e.to_string()))?;
430            
431            results.push(AggregateResult {
432                value,
433                timestamp,
434            });
435        }
436
437        Ok(results)
438    }
439
440    fn new_with_options(
441        connection_string: &str,
442        options: &HashMap<String, String>,
443        credentials: Option<&Credentials>,
444    ) -> Result<Self, Status> {
445        let mut all_options = options.clone();
446        if let Some(creds) = credentials {
447            all_options.insert("username".to_string(), creds.username.clone());
448            all_options.insert("password".to_string(), creds.password.clone());
449        }
450
451        let ttl = all_options.get("ttl")
452            .and_then(|s| s.parse().ok())
453            .map(|ttl| if ttl == 0 { None } else { Some(ttl) })
454            .unwrap_or(None);
455
456        Self::new(connection_string.to_string(), all_options, ttl)
457    }
458
459    async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status> {
460        // Create table in DuckDB
461        let sql = Self::schema_to_create_table_sql(table_name, schema);
462        self.execute(&sql).await?;
463
464        // Register table in manager
465        self.table_manager.create_table(table_name.to_string(), schema.clone()).await?;
466        Ok(())
467    }
468
469    async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status> {
470        let conn = self.conn.lock().await;
471        let mut stmt = conn.prepare(&format!("INSERT INTO {} VALUES ({})",
472            table_name,
473            (0..batch.num_columns()).map(|_| "?").collect::<Vec<_>>().join(", ")
474        )).map_err(|e| Status::internal(e.to_string()))?;
475
476        for row_idx in 0..batch.num_rows() {
477            let mut param_values: Vec<Box<dyn ToSql>> = Vec::new();
478            for col_idx in 0..batch.num_columns() {
479                let col = batch.column(col_idx);
480                match col.data_type() {
481                    DataType::Int64 => {
482                        let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
483                        param_values.push(Box::new(array.value(row_idx)));
484                    }
485                    DataType::Float64 => {
486                        let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
487                        param_values.push(Box::new(array.value(row_idx)));
488                    }
489                    DataType::Utf8 => {
490                        let array = col.as_any().downcast_ref::<StringArray>().unwrap();
491                        param_values.push(Box::new(array.value(row_idx).to_string()));
492                    }
493                    _ => return Err(Status::internal("Unsupported column type")),
494                }
495            }
496
497            let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
498            stmt.execute(param_refs.as_slice()).map_err(|e| Status::internal(e.to_string()))?;
499        }
500
501        Ok(())
502    }
503
504    async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status> {
505        let schema = self.table_manager.get_table_schema(table_name).await?;
506        
507        let mut builders: Vec<Box<dyn ArrayBuilder>> = schema.fields().iter()
508            .map(|field| Self::create_array_builder(field))
509            .collect();
510        
511        let projection = projection.unwrap_or_else(|| {
512            schema.fields().iter().map(|f| f.name().clone()).collect()
513        });
514
515        let sql = format!(
516            "SELECT {} FROM {}",
517            projection.join(", "),
518            table_name
519        );
520
521        let conn = self.conn.lock().await;
522        let mut stmt = conn.prepare(&sql)
523            .map_err(|e| Status::internal(e.to_string()))?;
524
525        let mut rows = stmt.query(params![])
526            .map_err(|e| Status::internal(e.to_string()))?;
527
528        while let Some(row) = rows.next().map_err(|e| Status::internal(e.to_string()))? {
529            for (i, field) in schema.fields().iter().enumerate() {
530                match field.data_type() {
531                    DataType::Int64 => {
532                        let builder = builders[i].as_any_mut().downcast_mut::<Int64Builder>().unwrap();
533                        match row.get::<usize, i64>(i) {
534                            Ok(value) => builder.append_value(value),
535                            Err(_) => builder.append_null(),
536                        }
537                    }
538                    DataType::Float64 => {
539                        let builder = builders[i].as_any_mut().downcast_mut::<Float64Builder>().unwrap();
540                        match row.get::<usize, f64>(i) {
541                            Ok(value) => builder.append_value(value),
542                            Err(_) => builder.append_null(),
543                        }
544                    }
545                    DataType::Utf8 => {
546                        let builder = builders[i].as_any_mut().downcast_mut::<StringBuilder>().unwrap();
547                        match row.get::<usize, String>(i) {
548                            Ok(value) => builder.append_value(value),
549                            Err(_) => builder.append_null(),
550                        }
551                    }
552                    _ => return Err(Status::internal("Unsupported column type")),
553                }
554            }
555        }
556
557        let arrays: Vec<ArrayRef> = builders.into_iter()
558            .map(|mut builder| Arc::new(builder.finish()) as ArrayRef)
559            .collect();
560
561        Ok(RecordBatch::try_new(Arc::new(schema), arrays)
562            .map_err(|e| Status::internal(format!("Failed to create record batch: {}", e)))?)
563    }
564
565    async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status> {
566        let columns: Vec<&str> = view.aggregate_columns.iter()
567            .map(|s| s.as_str())
568            .collect();
569            
570        let sql = build_aggregate_query(
571            &view.source_table,
572            view.function,
573            &view.group_by,
574            &columns,
575            None,
576            None
577        );
578        
579        let view_name = format!("agg_view_{}", view.source_table);
580        let conn = self.conn.lock().await;
581        conn.execute(&format!("CREATE VIEW {} AS {}", view_name, sql), params![])
582            .map_err(|e| Status::internal(format!("Failed to create view: {}", e)))?;
583
584        // Register view in manager
585        self.table_manager.create_aggregation_view(
586            view_name,
587            view.source_table.clone(),
588            view.function.clone(),
589            view.group_by.clone(),
590            view.window.clone(),
591            view.aggregate_columns.clone(),
592        ).await?;
593
594        Ok(())
595    }
596
597    async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status> {
598        self.query_table(view_name, None).await
599    }
600
601    async fn drop_table(&self, table_name: &str) -> Result<(), Status> {
602        let conn = self.conn.lock().await;
603        conn.execute(&format!("DROP TABLE IF EXISTS {}", table_name), params![])
604            .map_err(|e| Status::internal(format!("Failed to drop table: {}", e)))?;
605
606        self.table_manager.drop_table(table_name).await?;
607        Ok(())
608    }
609
610    async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status> {
611        let conn = self.conn.lock().await;
612        conn.execute(&format!("DROP VIEW IF EXISTS {}", view_name), params![])
613            .map_err(|e| Status::internal(format!("Failed to drop view: {}", e)))?;
614
615        self.table_manager.drop_aggregation_view(view_name).await?;
616        Ok(())
617    }
618
619    fn table_manager(&self) -> &TableManager {
620        &self.table_manager
621    }
622}
623
624impl DuckDbBackend {
625    /// Executes a SQL query.
626    async fn execute(&self, query: &str) -> Result<(), Status> {
627        let conn = self.conn.lock().await;
628        conn.execute(query, params![])
629            .map_err(|e| Status::internal(e.to_string()))?;
630        Ok(())
631    }
632
633    /// Converts an Arrow schema to a DuckDB CREATE TABLE statement
634    fn schema_to_create_table_sql(table_name: &str, schema: &Schema) -> String {
635        let mut sql = format!("CREATE TABLE IF NOT EXISTS \"{}\" (", table_name);
636        let mut first = true;
637
638        for field in schema.fields() {
639            if !first {
640                sql.push_str(", ");
641            }
642            first = false;
643
644            sql.push_str(&format!("\"{}\" {}", field.name(), Self::arrow_type_to_duckdb_type(field.data_type())));
645        }
646
647        sql.push_str(")");
648        sql
649    }
650
651    /// Converts an Arrow data type to a DuckDB type string
652    fn arrow_type_to_duckdb_type(data_type: &DataType) -> &'static str {
653        match data_type {
654            DataType::Boolean => "BOOLEAN",
655            DataType::Int8 => "TINYINT",
656            DataType::Int16 => "SMALLINT",
657            DataType::Int32 => "INTEGER",
658            DataType::Int64 => "BIGINT",
659            DataType::UInt8 => "TINYINT",
660            DataType::UInt16 => "SMALLINT",
661            DataType::UInt32 => "INTEGER",
662            DataType::UInt64 => "BIGINT",
663            DataType::Float32 => "REAL",
664            DataType::Float64 => "DOUBLE",
665            DataType::Utf8 => "VARCHAR",
666            DataType::Binary => "BLOB",
667            DataType::Date32 => "DATE",
668            DataType::Date64 => "DATE",
669            DataType::Time32(_) => "TIME",
670            DataType::Time64(_) => "TIME",
671            DataType::Timestamp(_, _) => "TIMESTAMP",
672            _ => "VARCHAR", // Default to VARCHAR for unsupported types
673        }
674    }
675
676    fn create_array_builder(field: &Field) -> Box<dyn ArrayBuilder> {
677        match field.data_type() {
678            DataType::Int64 => Box::new(Int64Builder::new()),
679            DataType::Float64 => Box::new(Float64Builder::new()),
680            DataType::Utf8 => Box::new(StringBuilder::new()),
681            _ => panic!("Unsupported column type"),
682        }
683    }
684}
685