hyprstream_core/storage/
adbc.rs

1//! ADBC (Arrow Database Connectivity) storage backend implementation.
2//!
3//! This module provides a storage backend using ADBC, enabling:
4//! - Connection to any ADBC-compliant database
5//! - High-performance data transport using Arrow's columnar format
6//! - Connection pooling and prepared statements
7//! - Support for various database systems (PostgreSQL, MySQL, etc.)
8//!
9//! # Configuration
10//!
11//! The ADBC backend can be configured using the following options:
12//!
13//! ```toml
14//! [engine]
15//! engine = "adbc"
16//! # Base connection without credentials
17//! connection = "postgresql://localhost:5432/metrics"
18//! options = {
19//!     driver_path = "/usr/local/lib/libadbc_driver_postgresql.so",  # Required: Path to ADBC driver
20//!     pool_max = "10",                                            # Optional: Maximum pool connections
21//!     pool_min = "1",                                             # Optional: Minimum pool connections
22//!     connect_timeout = "30"                                      # Optional: Connection timeout in seconds
23//! }
24//! ```
25//!
26//! For security, credentials should be provided via environment variables:
27//! ```bash
28//! export HYPRSTREAM_DB_USERNAME=postgres
29//! export HYPRSTREAM_DB_PASSWORD=secret
30//! ```
31//!
32//! Or via command line:
33//!
34//! ```bash
35//! hyprstream \
36//!   --engine adbc \
37//!   --engine-connection "postgresql://localhost:5432/metrics" \
38//!   --engine-options driver_path=/usr/local/lib/libadbc_driver_postgresql.so \
39//!   --engine-options pool_max=10
40//! ```
41//!
42//! The implementation is optimized for efficient data transfer and
43//! query execution using Arrow's native formats.
44
45use adbc_core::{
46    driver_manager::{ManagedConnection, ManagedDriver},
47    options::{AdbcVersion, OptionDatabase, OptionValue},
48    Connection, Database, Driver, Statement, Optionable,
49};
50use arrow_array::{
51    Array, Int8Array, Int16Array, Int32Array, Int64Array,
52    Float32Array, Float64Array, BooleanArray, StringArray,
53    BinaryArray, TimestampNanosecondArray,
54};
55use arrow_schema::{Schema, DataType, Field};
56use async_trait::async_trait;
57use std::collections::HashMap;
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60use tokio::sync::Mutex;
61use tonic::Status;
62use crate::aggregation::{AggregateFunction, GroupBy, AggregateResult, build_aggregate_query};
63use crate::storage::table_manager::{TableManager, AggregationView};
64use crate::config::Credentials;
65use crate::metrics::MetricRecord;
66use crate::storage::StorageBackend;
67use crate::storage::cache::{CacheManager, CacheEviction};
68use arrow_array::ArrayRef;
69use arrow_array::RecordBatch;
70use crate::aggregation::TimeWindow;
71use crate::storage::BatchAggregation;
72use std::time::Duration;
73use hex;
74
75pub struct AdbcBackend {
76    conn: Arc<Mutex<ManagedConnection>>,
77    statement_counter: AtomicU64,
78    prepared_statements: Arc<Mutex<Vec<(u64, String)>>>,
79    cache_manager: CacheManager,
80    table_manager: TableManager,
81}
82
83#[async_trait]
84impl CacheEviction for AdbcBackend {
85    async fn execute_eviction(&self, query: &str) -> Result<(), Status> {
86        let conn = self.conn.clone();
87        let query = query.to_string(); // Clone for background task
88        tokio::spawn(async move {
89            let mut conn_guard = conn.lock().await;
90            if let Err(e) = conn_guard.new_statement()
91                .and_then(|mut stmt| {
92                    stmt.set_sql_query(&query)?;
93                    stmt.execute_update()
94                }) {
95                tracing::error!("Background eviction error: {}", e);
96            }
97        });
98        Ok(())
99    }
100}
101
102impl AdbcBackend {
103    pub fn new(driver_path: &str, connection: Option<&str>, credentials: Option<&Credentials>) -> Result<Self, Status> {
104        let mut driver = ManagedDriver::load_dynamic_from_filename(
105            driver_path,
106            None,
107            AdbcVersion::V100,
108        ).map_err(|e| Status::internal(format!("Failed to load ADBC driver: {}", e)))?;
109
110        let mut database = driver.new_database()
111            .map_err(|e| Status::internal(format!("Failed to create database: {}", e)))?;
112
113        // Set connection string if provided
114        if let Some(conn_str) = connection {
115            database.set_option(OptionDatabase::Uri, OptionValue::String(conn_str.to_string()))
116                .map_err(|e| Status::internal(format!("Failed to set connection string: {}", e)))?;
117        }
118
119        // Set credentials if provided
120        if let Some(creds) = credentials {
121            database.set_option(OptionDatabase::Username, OptionValue::String(creds.username.clone()))
122                .map_err(|e| Status::internal(format!("Failed to set username: {}", e)))?;
123
124            database.set_option(OptionDatabase::Password, OptionValue::String(creds.password.clone()))
125                .map_err(|e| Status::internal(format!("Failed to set password: {}", e)))?;
126        }
127
128        let connection = database.new_connection()
129            .map_err(|e| Status::internal(format!("Failed to create connection: {}", e)))?;
130
131        Ok(Self {
132            conn: Arc::new(Mutex::new(connection)),
133            statement_counter: AtomicU64::new(0),
134            prepared_statements: Arc::new(Mutex::new(Vec::new())),
135            cache_manager: CacheManager::new(None), // Initialize without TTL
136            table_manager: TableManager::new(),
137        })
138    }
139
140    async fn get_connection(&self) -> Result<tokio::sync::MutexGuard<'_, ManagedConnection>, Status> {
141        Ok(self.conn.lock().await)
142    }
143
144    async fn execute_statement(&self, conn: &mut ManagedConnection, query: &str) -> Result<(), Status> {
145        let mut stmt = conn.new_statement()
146            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
147
148        stmt.set_sql_query(query)
149            .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
150
151        stmt.execute_update()
152            .map_err(|e| Status::internal(format!("Failed to execute statement: {}", e)))?;
153
154        Ok(())
155    }
156
157    async fn execute_query(&self, conn: &mut ManagedConnection, query: &str, params: Option<RecordBatch>) -> Result<Vec<MetricRecord>, Status> {
158        let mut stmt = conn.new_statement()
159            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
160
161        stmt.set_sql_query(query)
162            .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
163
164        if let Some(batch) = params {
165            // Create a new statement for binding parameters
166            let mut bind_stmt = conn.new_statement()
167                .map_err(|e| Status::internal(format!("Failed to create bind statement: {}", e)))?;
168
169            // Set the parameters using SQL directly
170            let mut param_values = Vec::new();
171            for i in 0..batch.num_rows() {
172                for j in 0..batch.num_columns() {
173                    let col = batch.column(j);
174                    match col.data_type() {
175                        DataType::Int64 => {
176                            let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
177                            param_values.push(array.value(i).to_string());
178                        }
179                        DataType::Float64 => {
180                            let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
181                            param_values.push(array.value(i).to_string());
182                        }
183                        DataType::Utf8 => {
184                            let array = col.as_any().downcast_ref::<StringArray>().unwrap();
185                            param_values.push(format!("'{}'", array.value(i)));
186                        }
187                        _ => return Err(Status::internal("Unsupported parameter type")),
188                    }
189                }
190            }
191
192            let params_sql = format!("VALUES ({})", param_values.join(", "));
193            bind_stmt.set_sql_query(&params_sql)
194                .map_err(|e| Status::internal(format!("Failed to set parameters: {}", e)))?;
195
196            let mut bind_result = bind_stmt.execute()
197                .map_err(|e| Status::internal(format!("Failed to execute parameter binding: {}", e)))?;
198
199            while let Some(batch_result) = bind_result.next() {
200                let _ = batch_result.map_err(|e| Status::internal(format!("Failed to bind parameters: {}", e)))?;
201            }
202        }
203
204        let mut reader = stmt.execute()
205            .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
206
207        let mut metrics = Vec::new();
208        while let Some(batch_result) = reader.next() {
209            let batch = batch_result.map_err(|e| Status::internal(format!("Failed to get next batch: {}", e)))?;
210            
211            let metric_ids = batch.column_by_name("metric_id")
212                .and_then(|col| col.as_any().downcast_ref::<StringArray>())
213                .ok_or_else(|| Status::internal("Invalid metric_id column"))?;
214
215            let timestamps = batch.column_by_name("timestamp")
216                .and_then(|col| col.as_any().downcast_ref::<Int64Array>())
217                .ok_or_else(|| Status::internal("Invalid timestamp column"))?;
218
219            let sums = batch.column_by_name("value_running_window_sum")
220                .and_then(|col| col.as_any().downcast_ref::<Float64Array>())
221                .ok_or_else(|| Status::internal("Invalid value_running_window_sum column"))?;
222
223            let avgs = batch.column_by_name("value_running_window_avg")
224                .and_then(|col| col.as_any().downcast_ref::<Float64Array>())
225                .ok_or_else(|| Status::internal("Invalid value_running_window_avg column"))?;
226
227            let counts = batch.column_by_name("value_running_window_count")
228                .and_then(|col| col.as_any().downcast_ref::<Int64Array>())
229                .ok_or_else(|| Status::internal("Invalid value_running_window_count column"))?;
230
231            for i in 0..batch.num_rows() {
232                metrics.push(MetricRecord {
233                    metric_id: metric_ids.value(i).to_string(),
234                    timestamp: timestamps.value(i),
235                    value_running_window_sum: sums.value(i),
236                    value_running_window_avg: avgs.value(i),
237                    value_running_window_count: counts.value(i),
238                });
239            }
240        }
241
242        Ok(metrics)
243    }
244
245    fn prepare_timestamp_param(timestamp: i64) -> Result<RecordBatch, Status> {
246        let schema = Arc::new(Schema::new(vec![
247            Field::new("timestamp", DataType::Int64, false),
248        ]));
249
250        let timestamps: ArrayRef = Arc::new(Int64Array::from(vec![timestamp]));
251        
252        RecordBatch::try_new(schema, vec![timestamps])
253            .map_err(|e| Status::internal(format!("Failed to create parameter batch: {}", e)))
254    }
255
256    fn prepare_params(metrics: &[MetricRecord]) -> Result<RecordBatch, Status> {
257        let schema = Arc::new(Schema::new(vec![
258            Field::new("metric_id", DataType::Utf8, false),
259            Field::new("timestamp", DataType::Int64, false),
260            Field::new("value_running_window_sum", DataType::Float64, false),
261            Field::new("value_running_window_avg", DataType::Float64, false),
262            Field::new("value_running_window_count", DataType::Int64, false),
263        ]));
264
265        let metric_ids = StringArray::from_iter_values(metrics.iter().map(|m| m.metric_id.as_str()));
266        let timestamps = Int64Array::from_iter_values(metrics.iter().map(|m| m.timestamp));
267        let sums = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_sum));
268        let avgs = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_avg));
269        let counts = Int64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_count));
270
271        let arrays: Vec<ArrayRef> = vec![
272            Arc::new(metric_ids),
273            Arc::new(timestamps),
274            Arc::new(sums),
275            Arc::new(avgs),
276            Arc::new(counts),
277        ];
278
279        RecordBatch::try_new(schema, arrays)
280            .map_err(|e| Status::internal(format!("Failed to create parameter batch: {}", e)))
281    }
282
283    /// Inserts a batch of metrics with optimized aggregation updates.
284    async fn insert_batch_optimized(&self, metrics: &[MetricRecord], _window: TimeWindow) -> Result<(), Status> {
285        // Begin transaction
286        self.begin_transaction().await?;
287        let mut conn = self.conn.lock().await;
288        
289        // Insert metrics
290        let batch = Self::prepare_params(metrics)?;
291        let sql = self.build_insert_sql("metrics", &batch);
292        let mut stmt = conn.new_statement()
293            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
294
295        stmt.set_sql_query(&sql)
296            .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
297
298        // Bind parameters
299        let mut bind_stmt = conn.new_statement()
300            .map_err(|e| Status::internal(format!("Failed to create bind statement: {}", e)))?;
301
302        let mut param_values = Vec::new();
303        for i in 0..batch.num_rows() {
304            for j in 0..batch.num_columns() {
305                let col = batch.column(j);
306                match col.data_type() {
307                    DataType::Int64 => {
308                        let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
309                        param_values.push(array.value(i).to_string());
310                    }
311                    DataType::Float64 => {
312                        let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
313                        param_values.push(array.value(i).to_string());
314                    }
315                    DataType::Utf8 => {
316                        let array = col.as_any().downcast_ref::<StringArray>().unwrap();
317                        param_values.push(format!("'{}'", array.value(i)));
318                    }
319                    _ => return Err(Status::internal("Unsupported parameter type")),
320                }
321            }
322        }
323
324        let params_sql = format!("VALUES ({})", param_values.join(", "));
325        bind_stmt.set_sql_query(&params_sql)
326            .map_err(|e| Status::internal(format!("Failed to set parameters: {}", e)))?;
327
328        let mut bind_result = bind_stmt.execute()
329            .map_err(|e| Status::internal(format!("Failed to execute parameter binding: {}", e)))?;
330
331        while let Some(batch_result) = bind_result.next() {
332            let _ = batch_result.map_err(|e| Status::internal(format!("Failed to bind parameters: {}", e)))?;
333        }
334
335        stmt.execute_update()
336            .map_err(|e| Status::internal(format!("Failed to insert metrics: {}", e)))?;
337
338        // Commit transaction
339        self.commit_transaction().await?;
340
341        Ok(())
342    }
343
344    /// Prepares parameters for aggregation insertion
345    fn prepare_aggregation_params(agg: &BatchAggregation) -> Result<RecordBatch, Status> {
346        let schema = Arc::new(Schema::new(vec![
347            Field::new("metric_id", DataType::Utf8, false),
348            Field::new("window_start", DataType::Int64, false),
349            Field::new("window_end", DataType::Int64, false),
350            Field::new("running_sum", DataType::Float64, false),
351            Field::new("running_count", DataType::Int64, false),
352            Field::new("min_value", DataType::Float64, false),
353            Field::new("max_value", DataType::Float64, false),
354        ]));
355
356        let arrays: Vec<ArrayRef> = vec![
357            Arc::new(StringArray::from(vec![agg.metric_id.as_str()])),
358            Arc::new(Int64Array::from(vec![agg.window_start])),
359            Arc::new(Int64Array::from(vec![agg.window_end])),
360            Arc::new(Float64Array::from(vec![agg.running_sum])),
361            Arc::new(Int64Array::from(vec![agg.running_count])),
362            Arc::new(Float64Array::from(vec![agg.min_value])),
363            Arc::new(Float64Array::from(vec![agg.max_value])),
364        ];
365
366        RecordBatch::try_new(schema, arrays)
367            .map_err(|e| Status::internal(format!("Failed to create aggregation batch: {}", e)))
368    }
369
370    async fn begin_transaction(&self) -> Result<(), Status> {
371        let mut conn = self.conn.lock().await;
372        let mut stmt = conn.new_statement()
373            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
374
375        stmt.set_sql_query("BEGIN")
376            .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
377
378        stmt.execute_update()
379            .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
380
381        Ok(())
382    }
383
384    async fn commit_transaction(&self) -> Result<(), Status> {
385        let mut conn = self.conn.lock().await;
386        let mut stmt = conn.new_statement()
387            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
388
389        stmt.set_sql_query("COMMIT")
390            .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
391
392        stmt.execute_update()
393            .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
394
395        Ok(())
396    }
397
398    async fn rollback_transaction(&self, conn: &mut ManagedConnection) -> Result<(), Status> {
399        self.execute_statement(conn, "ROLLBACK").await
400    }
401
402    async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status> {
403        let mut conn = self.conn.lock().await;
404        let sql = self.build_create_table_sql(table_name, schema);
405        self.execute_statement(&mut conn, &sql).await
406    }
407
408    async fn create_view(&self, view: &AggregationView, sql: &str) -> Result<(), Status> {
409        let mut conn = self.conn.lock().await;
410        let create_view_sql = format!("CREATE VIEW {} AS {}", view.source_table, sql);
411        self.execute_statement(&mut conn, &create_view_sql).await
412    }
413
414    async fn drop_table(&self, table_name: &str) -> Result<(), Status> {
415        let mut conn = self.conn.lock().await;
416        let sql = format!("DROP TABLE IF EXISTS {}", table_name);
417        self.execute_statement(&mut conn, &sql).await
418    }
419
420    async fn drop_view(&self, view_name: &str) -> Result<(), Status> {
421        let mut conn = self.conn.lock().await;
422        let sql = format!("DROP VIEW IF EXISTS {}", view_name);
423        self.execute_statement(&mut conn, &sql).await
424    }
425
426    fn build_create_table_sql(&self, table_name: &str, schema: &Schema) -> String {
427        let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", table_name);
428        let mut first = true;
429
430        for field in schema.fields() {
431            if !first {
432                sql.push_str(", ");
433            }
434            first = false;
435
436            sql.push_str(&format!("{} {}", field.name(), self.arrow_type_to_sql_type(field.data_type())));
437        }
438
439        sql.push_str(")");
440        sql
441    }
442
443    fn build_insert_sql(&self, table_name: &str, batch: &RecordBatch) -> String {
444        let mut sql = format!("INSERT INTO {} (", table_name);
445        let mut first = true;
446
447        for field in batch.schema().fields() {
448            if !first {
449                sql.push_str(", ");
450            }
451            first = false;
452            sql.push_str(field.name());
453        }
454
455        sql.push_str(") VALUES (");
456        first = true;
457
458        for i in 0..batch.num_columns() {
459            if !first {
460                sql.push_str(", ");
461            }
462            first = false;
463            sql.push('?');
464        }
465
466        sql.push(')');
467        sql
468    }
469
470    fn arrow_type_to_sql_type(&self, data_type: &DataType) -> &'static str {
471        match data_type {
472            DataType::Boolean => "BOOLEAN",
473            DataType::Int8 => "TINYINT",
474            DataType::Int16 => "SMALLINT",
475            DataType::Int32 => "INTEGER",
476            DataType::Int64 => "BIGINT",
477            DataType::UInt8 => "TINYINT UNSIGNED",
478            DataType::UInt16 => "SMALLINT UNSIGNED",
479            DataType::UInt32 => "INTEGER UNSIGNED",
480            DataType::UInt64 => "BIGINT UNSIGNED",
481            DataType::Float32 => "FLOAT",
482            DataType::Float64 => "DOUBLE",
483            DataType::Utf8 => "VARCHAR",
484            DataType::Binary => "BLOB",
485            DataType::Date32 => "DATE",
486            DataType::Date64 => "DATE",
487            DataType::Time32(_) => "TIME",
488            DataType::Time64(_) => "TIME",
489            DataType::Timestamp(_, _) => "TIMESTAMP",
490            _ => "VARCHAR",
491        }
492    }
493}
494
495#[async_trait]
496impl StorageBackend for AdbcBackend {
497    async fn init(&self) -> Result<(), Status> {
498        let mut conn = self.conn.lock().await;
499        
500        // Create metrics table
501        let mut stmt = conn.new_statement()
502            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
503
504        stmt.set_sql_query(r#"
505            CREATE TABLE IF NOT EXISTS metrics (
506                metric_id VARCHAR NOT NULL,
507                timestamp BIGINT NOT NULL,
508                value_running_window_sum DOUBLE PRECISION NOT NULL,
509                value_running_window_avg DOUBLE PRECISION NOT NULL,
510                value_running_window_count BIGINT NOT NULL,
511                PRIMARY KEY (metric_id, timestamp)
512            );
513
514            CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
515
516            CREATE TABLE IF NOT EXISTS metric_aggregations (
517                metric_id VARCHAR NOT NULL,
518                window_start BIGINT NOT NULL,
519                window_end BIGINT NOT NULL,
520                running_sum DOUBLE PRECISION NOT NULL,
521                running_count BIGINT NOT NULL,
522                min_value DOUBLE PRECISION NOT NULL,
523                max_value DOUBLE PRECISION NOT NULL,
524                PRIMARY KEY (metric_id, window_start, window_end)
525            );
526
527            CREATE INDEX IF NOT EXISTS idx_aggregations_window 
528            ON metric_aggregations(window_start, window_end);
529        "#).map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
530
531        stmt.execute_update()
532            .map_err(|e| Status::internal(format!("Failed to create tables: {}", e)))?;
533
534        Ok(())
535    }
536
537    async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status> {
538        if metrics.is_empty() {
539            return Ok(());
540        }
541
542        // Check if eviction is needed
543        if let Some(cutoff) = self.cache_manager.should_evict().await? {
544            let query = self.cache_manager.eviction_query(cutoff);
545            self.execute_eviction(&query).await?;
546        }
547
548        // Use sliding window for batch-level aggregations
549        let window = TimeWindow::Sliding {
550            window: Duration::from_secs(3600), // 1 hour window
551            slide: Duration::from_secs(60),    // 1 minute slide
552        };
553
554        // Use optimized batch insertion
555        self.insert_batch_optimized(&metrics, window).await
556    }
557
558    async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status> {
559        // Check if eviction is needed
560        if let Some(cutoff) = self.cache_manager.should_evict().await? {
561            let query = self.cache_manager.eviction_query(cutoff);
562            self.execute_eviction(&query).await?;
563        }
564
565        let mut conn = self.conn.lock().await;
566        
567        let query = r#"
568            SELECT
569                metric_id,
570                timestamp,
571                value_running_window_sum,
572                value_running_window_avg,
573                value_running_window_count
574            FROM metrics
575            WHERE timestamp >= ?
576            ORDER BY timestamp ASC
577        "#;
578
579        let params = Self::prepare_timestamp_param(from_timestamp)?;
580        self.execute_query(&mut conn, query, Some(params)).await
581    }
582
583    async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status> {
584        let handle = self.statement_counter.fetch_add(1, Ordering::SeqCst);
585        let mut statements = self.prepared_statements.lock().await;
586        statements.push((handle, query.to_string()));
587        Ok(handle.to_le_bytes().to_vec())
588    }
589
590    async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status> {
591        let handle = u64::from_le_bytes(
592            statement_handle.try_into()
593                .map_err(|_| Status::invalid_argument("Invalid statement handle"))?
594        );
595
596        let statements = self.prepared_statements.lock().await;
597        let sql = statements
598            .iter()
599            .find(|(h, _)| *h == handle)
600            .map(|(_, sql)| sql.as_str())
601            .ok_or_else(|| Status::invalid_argument("Statement handle not found"))?;
602
603        let mut conn = self.conn.lock().await;
604        self.execute_query(&mut conn, sql, None).await
605    }
606
607    async fn aggregate_metrics(
608        &self,
609        function: AggregateFunction,
610        group_by: &GroupBy,
611        from_timestamp: i64,
612        to_timestamp: Option<i64>,
613    ) -> Result<Vec<AggregateResult>, Status> {
614        // Check if eviction is needed
615        if let Some(cutoff) = self.cache_manager.should_evict().await? {
616            let query = self.cache_manager.eviction_query(cutoff);
617            self.execute_eviction(&query).await?;
618        }
619
620        const DEFAULT_COLUMNS: [&str; 5] = [
621            "metric_id",
622            "timestamp",
623            "value_running_window_sum",
624            "value_running_window_avg",
625            "value_running_window_count"
626        ];
627
628        let query = build_aggregate_query(
629            "metrics",
630            function,
631            group_by,
632            &DEFAULT_COLUMNS,
633            Some(from_timestamp),
634            to_timestamp,
635        );
636        let mut conn = self.conn.lock().await;
637        let metrics = self.execute_query(&mut conn, &query, None).await?;
638
639        let mut results = Vec::new();
640        for metric in metrics {
641            let result = AggregateResult {
642                value: metric.value_running_window_sum,
643                timestamp: metric.timestamp,
644                // Add any other fields required by AggregateResult
645            };
646            results.push(result);
647        }
648
649        Ok(results)
650    }
651
652    fn new_with_options(
653        connection_string: &str,
654        options: &HashMap<String, String>,
655        credentials: Option<&Credentials>,
656    ) -> Result<Self, Status> {
657        let driver_path = options.get("driver_path")
658            .ok_or_else(|| Status::invalid_argument("driver_path is required"))?;
659
660        let mut driver = ManagedDriver::load_dynamic_from_filename(
661            driver_path,
662            None,
663            AdbcVersion::V100,
664        ).map_err(|e| Status::internal(format!("Failed to load ADBC driver: {}", e)))?;
665
666        let mut database = driver.new_database()
667            .map_err(|e| Status::internal(format!("Failed to create database: {}", e)))?;
668
669        // Set connection string
670        database.set_option(OptionDatabase::Uri, OptionValue::String(connection_string.to_string()))
671            .map_err(|e| Status::internal(format!("Failed to set connection string: {}", e)))?;
672
673        // Set credentials if provided
674        if let Some(creds) = credentials {
675            database.set_option(OptionDatabase::Username, OptionValue::String(creds.username.clone()))
676                .map_err(|e| Status::internal(format!("Failed to set username: {}", e)))?;
677
678            database.set_option(OptionDatabase::Password, OptionValue::String(creds.password.clone()))
679                .map_err(|e| Status::internal(format!("Failed to set password: {}", e)))?;
680        }
681
682        let connection = database.new_connection()
683            .map_err(|e| Status::internal(format!("Failed to create connection: {}", e)))?;
684
685        Ok(Self {
686            conn: Arc::new(Mutex::new(connection)),
687            statement_counter: AtomicU64::new(0),
688            prepared_statements: Arc::new(Mutex::new(Vec::new())),
689            cache_manager: CacheManager::new(None), // Initialize without TTL
690            table_manager: TableManager::new(),
691        })
692    }
693
694    async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status> {
695        let mut conn = self.conn.lock().await;
696        let sql = self.build_create_table_sql(table_name, schema);
697        self.execute_statement(&mut conn, &sql).await
698    }
699
700    async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status> {
701        let mut conn = self.conn.lock().await;
702        let sql = self.build_insert_sql(table_name, &batch);
703        self.execute_statement(&mut conn, &sql).await
704    }
705
706    async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status> {
707        let mut conn = self.conn.lock().await;
708        let mut stmt = conn.new_statement()
709            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
710
711        let columns = projection.map(|cols| cols.join(", ")).unwrap_or_else(|| "*".to_string());
712        let sql = format!("SELECT {} FROM {}", columns, table_name);
713
714        stmt.set_sql_query(&sql)
715            .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
716
717        let mut reader = stmt.execute()
718            .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
719
720        let batch = reader.next()
721            .ok_or_else(|| Status::internal("No data returned"))?
722            .map_err(|e| Status::internal(format!("Failed to read record batch: {}", e)))?;
723
724        // Convert ADBC RecordBatch to Arrow RecordBatch
725        let schema = batch.schema();
726        let mut arrays = Vec::with_capacity(batch.num_columns());
727
728        for _i in 0..batch.num_columns() {
729            let col = batch.column(_i);
730            let array: ArrayRef = match col.data_type() {
731                &duckdb::arrow::datatypes::DataType::Int64 => {
732                    Arc::new(col.as_any().downcast_ref::<Int64Array>().unwrap().clone())
733                },
734                &duckdb::arrow::datatypes::DataType::Float64 => {
735                    Arc::new(col.as_any().downcast_ref::<Float64Array>().unwrap().clone())
736                },
737                &duckdb::arrow::datatypes::DataType::Utf8 => {
738                    Arc::new(col.as_any().downcast_ref::<StringArray>().unwrap().clone())
739                },
740                _ => return Err(Status::internal("Unsupported column type")),
741            };
742            arrays.push(array);
743        }
744
745        // Convert DuckDB schema to Arrow schema
746        let fields: Vec<Field> = schema.fields().iter().map(|f| {
747            Field::new(
748                f.name(),
749                match f.data_type() {
750                    &duckdb::arrow::datatypes::DataType::Int64 => DataType::Int64,
751                    &duckdb::arrow::datatypes::DataType::Float64 => DataType::Float64,
752                    &duckdb::arrow::datatypes::DataType::Utf8 => DataType::Utf8,
753                    _ => DataType::Utf8, // Default to string for unsupported types
754                },
755                f.is_nullable()
756            )
757        }).collect();
758
759        let arrow_schema = Schema::new(fields);
760        RecordBatch::try_new(Arc::new(arrow_schema), arrays)
761            .map_err(|e| Status::internal(format!("Failed to create record batch: {}", e)))
762    }
763
764    async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status> {
765        let columns: Vec<&str> = view.aggregate_columns.iter()
766            .map(|s| s.as_str())
767            .collect();
768            
769        let sql = build_aggregate_query(
770            &view.source_table,
771            view.function,
772            &view.group_by,
773            &columns,
774            None,
775            None
776        );
777        
778        let mut conn = self.conn.lock().await;
779        let mut stmt = conn.new_statement()
780            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
781
782        stmt.set_sql_query(&format!("CREATE VIEW {} AS {}", view.source_table, sql))
783            .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
784
785        stmt.execute_update()
786            .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
787
788        Ok(())
789    }
790
791    async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status> {
792        let sql = format!("SELECT * FROM {}", view_name);
793
794        let mut conn = self.conn.lock().await;
795        let mut stmt = conn.new_statement()
796            .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
797
798        stmt.set_sql_query(&sql)
799            .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
800
801        let mut reader = stmt.execute()
802            .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
803
804        let batch = reader.next()
805            .ok_or_else(|| Status::internal("No data returned"))?
806            .map_err(|e| Status::internal(format!("Failed to read record batch: {}", e)))?;
807
808        // Convert DuckDB RecordBatch to Arrow RecordBatch
809        let schema = batch.schema();
810        let mut arrays = Vec::with_capacity(batch.num_columns());
811
812        for i in 0..batch.num_columns() {
813            let col = batch.column(i);
814            let array: ArrayRef = match col.data_type() {
815                &duckdb::arrow::datatypes::DataType::Int64 => {
816                    Arc::new(col.as_any().downcast_ref::<Int64Array>().unwrap().clone())
817                },
818                &duckdb::arrow::datatypes::DataType::Float64 => {
819                    Arc::new(col.as_any().downcast_ref::<Float64Array>().unwrap().clone())
820                },
821                &duckdb::arrow::datatypes::DataType::Utf8 => {
822                    Arc::new(col.as_any().downcast_ref::<StringArray>().unwrap().clone())
823                },
824                _ => return Err(Status::internal("Unsupported column type")),
825            };
826            arrays.push(array);
827        }
828
829        // Convert DuckDB schema to Arrow schema
830        let fields: Vec<Field> = schema.fields().iter().map(|f| {
831            Field::new(
832                f.name(),
833                match f.data_type() {
834                    &duckdb::arrow::datatypes::DataType::Int64 => DataType::Int64,
835                    &duckdb::arrow::datatypes::DataType::Float64 => DataType::Float64,
836                    &duckdb::arrow::datatypes::DataType::Utf8 => DataType::Utf8,
837                    _ => DataType::Utf8, // Default to string for unsupported types
838                },
839                f.is_nullable()
840            )
841        }).collect();
842
843        let arrow_schema = Schema::new(fields);
844        RecordBatch::try_new(Arc::new(arrow_schema), arrays)
845            .map_err(|e| Status::internal(format!("Failed to create record batch: {}", e)))
846    }
847
848    async fn drop_table(&self, table_name: &str) -> Result<(), Status> {
849        let mut conn = self.conn.lock().await;
850        let mut stmt = conn.new_statement().map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
851        stmt.set_sql_query(&format!("DROP TABLE IF EXISTS {}", table_name))
852            .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
853        stmt.execute_update().map_err(|e| Status::internal(format!("Failed to drop table: {}", e)))?;
854        Ok(())
855    }
856
857    async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status> {
858        let mut conn = self.conn.lock().await;
859        let mut stmt = conn.new_statement().map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
860        stmt.set_sql_query(&format!("DROP VIEW IF EXISTS {}", view_name))
861            .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
862        stmt.execute_update().map_err(|e| Status::internal(format!("Failed to drop view: {}", e)))?;
863        Ok(())
864    }
865
866    fn table_manager(&self) -> &TableManager {
867        &self.table_manager
868    }
869}
870
871fn format_value(array: &dyn Array, index: usize) -> String {
872    match array.data_type() {
873        DataType::Int8 => format!("{}", array.as_any().downcast_ref::<Int8Array>().unwrap().value(index)),
874        DataType::Int16 => format!("{}", array.as_any().downcast_ref::<Int16Array>().unwrap().value(index)),
875        DataType::Int32 => format!("{}", array.as_any().downcast_ref::<Int32Array>().unwrap().value(index)),
876        DataType::Int64 => format!("{}", array.as_any().downcast_ref::<Int64Array>().unwrap().value(index)),
877        DataType::Float32 => format!("{}", array.as_any().downcast_ref::<Float32Array>().unwrap().value(index)),
878        DataType::Float64 => format!("{}", array.as_any().downcast_ref::<Float64Array>().unwrap().value(index)),
879        DataType::Boolean => format!("{}", array.as_any().downcast_ref::<BooleanArray>().unwrap().value(index)),
880        DataType::Utf8 => format!("'{}'", array.as_any().downcast_ref::<StringArray>().unwrap().value(index)),
881        DataType::Binary => format!("X'{}'", hex::encode(array.as_any().downcast_ref::<BinaryArray>().unwrap().value(index))),
882        DataType::Timestamp(_, _) => {
883            let ts = array.as_any().downcast_ref::<TimestampNanosecondArray>().unwrap().value(index);
884            let seconds = ts / 1_000_000_000;
885            let nanos = (ts % 1_000_000_000) as u32;
886            format!("'{}'", chrono::DateTime::from_timestamp(seconds, nanos)
887                .unwrap_or_default()
888                .naive_utc())
889        },
890        _ => "NULL".to_string(),
891    }
892}