1use adbc_core::{
46 driver_manager::{ManagedConnection, ManagedDriver},
47 options::{AdbcVersion, OptionDatabase, OptionValue},
48 Connection, Database, Driver, Statement, Optionable,
49};
50use arrow_array::{
51 Array, Int8Array, Int16Array, Int32Array, Int64Array,
52 Float32Array, Float64Array, BooleanArray, StringArray,
53 BinaryArray, TimestampNanosecondArray,
54};
55use arrow_schema::{Schema, DataType, Field};
56use async_trait::async_trait;
57use std::collections::HashMap;
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60use tokio::sync::Mutex;
61use tonic::Status;
62use crate::aggregation::{AggregateFunction, GroupBy, AggregateResult, build_aggregate_query};
63use crate::storage::table_manager::{TableManager, AggregationView};
64use crate::config::Credentials;
65use crate::metrics::MetricRecord;
66use crate::storage::StorageBackend;
67use crate::storage::cache::{CacheManager, CacheEviction};
68use arrow_array::ArrayRef;
69use arrow_array::RecordBatch;
70use crate::aggregation::TimeWindow;
71use crate::storage::BatchAggregation;
72use std::time::Duration;
73use hex;
74
75pub struct AdbcBackend {
76 conn: Arc<Mutex<ManagedConnection>>,
77 statement_counter: AtomicU64,
78 prepared_statements: Arc<Mutex<Vec<(u64, String)>>>,
79 cache_manager: CacheManager,
80 table_manager: TableManager,
81}
82
83#[async_trait]
84impl CacheEviction for AdbcBackend {
85 async fn execute_eviction(&self, query: &str) -> Result<(), Status> {
86 let conn = self.conn.clone();
87 let query = query.to_string(); tokio::spawn(async move {
89 let mut conn_guard = conn.lock().await;
90 if let Err(e) = conn_guard.new_statement()
91 .and_then(|mut stmt| {
92 stmt.set_sql_query(&query)?;
93 stmt.execute_update()
94 }) {
95 tracing::error!("Background eviction error: {}", e);
96 }
97 });
98 Ok(())
99 }
100}
101
102impl AdbcBackend {
103 pub fn new(driver_path: &str, connection: Option<&str>, credentials: Option<&Credentials>) -> Result<Self, Status> {
104 let mut driver = ManagedDriver::load_dynamic_from_filename(
105 driver_path,
106 None,
107 AdbcVersion::V100,
108 ).map_err(|e| Status::internal(format!("Failed to load ADBC driver: {}", e)))?;
109
110 let mut database = driver.new_database()
111 .map_err(|e| Status::internal(format!("Failed to create database: {}", e)))?;
112
113 if let Some(conn_str) = connection {
115 database.set_option(OptionDatabase::Uri, OptionValue::String(conn_str.to_string()))
116 .map_err(|e| Status::internal(format!("Failed to set connection string: {}", e)))?;
117 }
118
119 if let Some(creds) = credentials {
121 database.set_option(OptionDatabase::Username, OptionValue::String(creds.username.clone()))
122 .map_err(|e| Status::internal(format!("Failed to set username: {}", e)))?;
123
124 database.set_option(OptionDatabase::Password, OptionValue::String(creds.password.clone()))
125 .map_err(|e| Status::internal(format!("Failed to set password: {}", e)))?;
126 }
127
128 let connection = database.new_connection()
129 .map_err(|e| Status::internal(format!("Failed to create connection: {}", e)))?;
130
131 Ok(Self {
132 conn: Arc::new(Mutex::new(connection)),
133 statement_counter: AtomicU64::new(0),
134 prepared_statements: Arc::new(Mutex::new(Vec::new())),
135 cache_manager: CacheManager::new(None), table_manager: TableManager::new(),
137 })
138 }
139
140 async fn get_connection(&self) -> Result<tokio::sync::MutexGuard<'_, ManagedConnection>, Status> {
141 Ok(self.conn.lock().await)
142 }
143
144 async fn execute_statement(&self, conn: &mut ManagedConnection, query: &str) -> Result<(), Status> {
145 let mut stmt = conn.new_statement()
146 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
147
148 stmt.set_sql_query(query)
149 .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
150
151 stmt.execute_update()
152 .map_err(|e| Status::internal(format!("Failed to execute statement: {}", e)))?;
153
154 Ok(())
155 }
156
157 async fn execute_query(&self, conn: &mut ManagedConnection, query: &str, params: Option<RecordBatch>) -> Result<Vec<MetricRecord>, Status> {
158 let mut stmt = conn.new_statement()
159 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
160
161 stmt.set_sql_query(query)
162 .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
163
164 if let Some(batch) = params {
165 let mut bind_stmt = conn.new_statement()
167 .map_err(|e| Status::internal(format!("Failed to create bind statement: {}", e)))?;
168
169 let mut param_values = Vec::new();
171 for i in 0..batch.num_rows() {
172 for j in 0..batch.num_columns() {
173 let col = batch.column(j);
174 match col.data_type() {
175 DataType::Int64 => {
176 let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
177 param_values.push(array.value(i).to_string());
178 }
179 DataType::Float64 => {
180 let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
181 param_values.push(array.value(i).to_string());
182 }
183 DataType::Utf8 => {
184 let array = col.as_any().downcast_ref::<StringArray>().unwrap();
185 param_values.push(format!("'{}'", array.value(i)));
186 }
187 _ => return Err(Status::internal("Unsupported parameter type")),
188 }
189 }
190 }
191
192 let params_sql = format!("VALUES ({})", param_values.join(", "));
193 bind_stmt.set_sql_query(¶ms_sql)
194 .map_err(|e| Status::internal(format!("Failed to set parameters: {}", e)))?;
195
196 let mut bind_result = bind_stmt.execute()
197 .map_err(|e| Status::internal(format!("Failed to execute parameter binding: {}", e)))?;
198
199 while let Some(batch_result) = bind_result.next() {
200 let _ = batch_result.map_err(|e| Status::internal(format!("Failed to bind parameters: {}", e)))?;
201 }
202 }
203
204 let mut reader = stmt.execute()
205 .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
206
207 let mut metrics = Vec::new();
208 while let Some(batch_result) = reader.next() {
209 let batch = batch_result.map_err(|e| Status::internal(format!("Failed to get next batch: {}", e)))?;
210
211 let metric_ids = batch.column_by_name("metric_id")
212 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
213 .ok_or_else(|| Status::internal("Invalid metric_id column"))?;
214
215 let timestamps = batch.column_by_name("timestamp")
216 .and_then(|col| col.as_any().downcast_ref::<Int64Array>())
217 .ok_or_else(|| Status::internal("Invalid timestamp column"))?;
218
219 let sums = batch.column_by_name("value_running_window_sum")
220 .and_then(|col| col.as_any().downcast_ref::<Float64Array>())
221 .ok_or_else(|| Status::internal("Invalid value_running_window_sum column"))?;
222
223 let avgs = batch.column_by_name("value_running_window_avg")
224 .and_then(|col| col.as_any().downcast_ref::<Float64Array>())
225 .ok_or_else(|| Status::internal("Invalid value_running_window_avg column"))?;
226
227 let counts = batch.column_by_name("value_running_window_count")
228 .and_then(|col| col.as_any().downcast_ref::<Int64Array>())
229 .ok_or_else(|| Status::internal("Invalid value_running_window_count column"))?;
230
231 for i in 0..batch.num_rows() {
232 metrics.push(MetricRecord {
233 metric_id: metric_ids.value(i).to_string(),
234 timestamp: timestamps.value(i),
235 value_running_window_sum: sums.value(i),
236 value_running_window_avg: avgs.value(i),
237 value_running_window_count: counts.value(i),
238 });
239 }
240 }
241
242 Ok(metrics)
243 }
244
245 fn prepare_timestamp_param(timestamp: i64) -> Result<RecordBatch, Status> {
246 let schema = Arc::new(Schema::new(vec![
247 Field::new("timestamp", DataType::Int64, false),
248 ]));
249
250 let timestamps: ArrayRef = Arc::new(Int64Array::from(vec![timestamp]));
251
252 RecordBatch::try_new(schema, vec![timestamps])
253 .map_err(|e| Status::internal(format!("Failed to create parameter batch: {}", e)))
254 }
255
256 fn prepare_params(metrics: &[MetricRecord]) -> Result<RecordBatch, Status> {
257 let schema = Arc::new(Schema::new(vec![
258 Field::new("metric_id", DataType::Utf8, false),
259 Field::new("timestamp", DataType::Int64, false),
260 Field::new("value_running_window_sum", DataType::Float64, false),
261 Field::new("value_running_window_avg", DataType::Float64, false),
262 Field::new("value_running_window_count", DataType::Int64, false),
263 ]));
264
265 let metric_ids = StringArray::from_iter_values(metrics.iter().map(|m| m.metric_id.as_str()));
266 let timestamps = Int64Array::from_iter_values(metrics.iter().map(|m| m.timestamp));
267 let sums = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_sum));
268 let avgs = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_avg));
269 let counts = Int64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_count));
270
271 let arrays: Vec<ArrayRef> = vec![
272 Arc::new(metric_ids),
273 Arc::new(timestamps),
274 Arc::new(sums),
275 Arc::new(avgs),
276 Arc::new(counts),
277 ];
278
279 RecordBatch::try_new(schema, arrays)
280 .map_err(|e| Status::internal(format!("Failed to create parameter batch: {}", e)))
281 }
282
283 async fn insert_batch_optimized(&self, metrics: &[MetricRecord], _window: TimeWindow) -> Result<(), Status> {
285 self.begin_transaction().await?;
287 let mut conn = self.conn.lock().await;
288
289 let batch = Self::prepare_params(metrics)?;
291 let sql = self.build_insert_sql("metrics", &batch);
292 let mut stmt = conn.new_statement()
293 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
294
295 stmt.set_sql_query(&sql)
296 .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
297
298 let mut bind_stmt = conn.new_statement()
300 .map_err(|e| Status::internal(format!("Failed to create bind statement: {}", e)))?;
301
302 let mut param_values = Vec::new();
303 for i in 0..batch.num_rows() {
304 for j in 0..batch.num_columns() {
305 let col = batch.column(j);
306 match col.data_type() {
307 DataType::Int64 => {
308 let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
309 param_values.push(array.value(i).to_string());
310 }
311 DataType::Float64 => {
312 let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
313 param_values.push(array.value(i).to_string());
314 }
315 DataType::Utf8 => {
316 let array = col.as_any().downcast_ref::<StringArray>().unwrap();
317 param_values.push(format!("'{}'", array.value(i)));
318 }
319 _ => return Err(Status::internal("Unsupported parameter type")),
320 }
321 }
322 }
323
324 let params_sql = format!("VALUES ({})", param_values.join(", "));
325 bind_stmt.set_sql_query(¶ms_sql)
326 .map_err(|e| Status::internal(format!("Failed to set parameters: {}", e)))?;
327
328 let mut bind_result = bind_stmt.execute()
329 .map_err(|e| Status::internal(format!("Failed to execute parameter binding: {}", e)))?;
330
331 while let Some(batch_result) = bind_result.next() {
332 let _ = batch_result.map_err(|e| Status::internal(format!("Failed to bind parameters: {}", e)))?;
333 }
334
335 stmt.execute_update()
336 .map_err(|e| Status::internal(format!("Failed to insert metrics: {}", e)))?;
337
338 self.commit_transaction().await?;
340
341 Ok(())
342 }
343
344 fn prepare_aggregation_params(agg: &BatchAggregation) -> Result<RecordBatch, Status> {
346 let schema = Arc::new(Schema::new(vec![
347 Field::new("metric_id", DataType::Utf8, false),
348 Field::new("window_start", DataType::Int64, false),
349 Field::new("window_end", DataType::Int64, false),
350 Field::new("running_sum", DataType::Float64, false),
351 Field::new("running_count", DataType::Int64, false),
352 Field::new("min_value", DataType::Float64, false),
353 Field::new("max_value", DataType::Float64, false),
354 ]));
355
356 let arrays: Vec<ArrayRef> = vec![
357 Arc::new(StringArray::from(vec![agg.metric_id.as_str()])),
358 Arc::new(Int64Array::from(vec![agg.window_start])),
359 Arc::new(Int64Array::from(vec![agg.window_end])),
360 Arc::new(Float64Array::from(vec![agg.running_sum])),
361 Arc::new(Int64Array::from(vec![agg.running_count])),
362 Arc::new(Float64Array::from(vec![agg.min_value])),
363 Arc::new(Float64Array::from(vec![agg.max_value])),
364 ];
365
366 RecordBatch::try_new(schema, arrays)
367 .map_err(|e| Status::internal(format!("Failed to create aggregation batch: {}", e)))
368 }
369
370 async fn begin_transaction(&self) -> Result<(), Status> {
371 let mut conn = self.conn.lock().await;
372 let mut stmt = conn.new_statement()
373 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
374
375 stmt.set_sql_query("BEGIN")
376 .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
377
378 stmt.execute_update()
379 .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
380
381 Ok(())
382 }
383
384 async fn commit_transaction(&self) -> Result<(), Status> {
385 let mut conn = self.conn.lock().await;
386 let mut stmt = conn.new_statement()
387 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
388
389 stmt.set_sql_query("COMMIT")
390 .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
391
392 stmt.execute_update()
393 .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
394
395 Ok(())
396 }
397
398 async fn rollback_transaction(&self, conn: &mut ManagedConnection) -> Result<(), Status> {
399 self.execute_statement(conn, "ROLLBACK").await
400 }
401
402 async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status> {
403 let mut conn = self.conn.lock().await;
404 let sql = self.build_create_table_sql(table_name, schema);
405 self.execute_statement(&mut conn, &sql).await
406 }
407
408 async fn create_view(&self, view: &AggregationView, sql: &str) -> Result<(), Status> {
409 let mut conn = self.conn.lock().await;
410 let create_view_sql = format!("CREATE VIEW {} AS {}", view.source_table, sql);
411 self.execute_statement(&mut conn, &create_view_sql).await
412 }
413
414 async fn drop_table(&self, table_name: &str) -> Result<(), Status> {
415 let mut conn = self.conn.lock().await;
416 let sql = format!("DROP TABLE IF EXISTS {}", table_name);
417 self.execute_statement(&mut conn, &sql).await
418 }
419
420 async fn drop_view(&self, view_name: &str) -> Result<(), Status> {
421 let mut conn = self.conn.lock().await;
422 let sql = format!("DROP VIEW IF EXISTS {}", view_name);
423 self.execute_statement(&mut conn, &sql).await
424 }
425
426 fn build_create_table_sql(&self, table_name: &str, schema: &Schema) -> String {
427 let mut sql = format!("CREATE TABLE IF NOT EXISTS {} (", table_name);
428 let mut first = true;
429
430 for field in schema.fields() {
431 if !first {
432 sql.push_str(", ");
433 }
434 first = false;
435
436 sql.push_str(&format!("{} {}", field.name(), self.arrow_type_to_sql_type(field.data_type())));
437 }
438
439 sql.push_str(")");
440 sql
441 }
442
443 fn build_insert_sql(&self, table_name: &str, batch: &RecordBatch) -> String {
444 let mut sql = format!("INSERT INTO {} (", table_name);
445 let mut first = true;
446
447 for field in batch.schema().fields() {
448 if !first {
449 sql.push_str(", ");
450 }
451 first = false;
452 sql.push_str(field.name());
453 }
454
455 sql.push_str(") VALUES (");
456 first = true;
457
458 for i in 0..batch.num_columns() {
459 if !first {
460 sql.push_str(", ");
461 }
462 first = false;
463 sql.push('?');
464 }
465
466 sql.push(')');
467 sql
468 }
469
470 fn arrow_type_to_sql_type(&self, data_type: &DataType) -> &'static str {
471 match data_type {
472 DataType::Boolean => "BOOLEAN",
473 DataType::Int8 => "TINYINT",
474 DataType::Int16 => "SMALLINT",
475 DataType::Int32 => "INTEGER",
476 DataType::Int64 => "BIGINT",
477 DataType::UInt8 => "TINYINT UNSIGNED",
478 DataType::UInt16 => "SMALLINT UNSIGNED",
479 DataType::UInt32 => "INTEGER UNSIGNED",
480 DataType::UInt64 => "BIGINT UNSIGNED",
481 DataType::Float32 => "FLOAT",
482 DataType::Float64 => "DOUBLE",
483 DataType::Utf8 => "VARCHAR",
484 DataType::Binary => "BLOB",
485 DataType::Date32 => "DATE",
486 DataType::Date64 => "DATE",
487 DataType::Time32(_) => "TIME",
488 DataType::Time64(_) => "TIME",
489 DataType::Timestamp(_, _) => "TIMESTAMP",
490 _ => "VARCHAR",
491 }
492 }
493}
494
495#[async_trait]
496impl StorageBackend for AdbcBackend {
497 async fn init(&self) -> Result<(), Status> {
498 let mut conn = self.conn.lock().await;
499
500 let mut stmt = conn.new_statement()
502 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
503
504 stmt.set_sql_query(r#"
505 CREATE TABLE IF NOT EXISTS metrics (
506 metric_id VARCHAR NOT NULL,
507 timestamp BIGINT NOT NULL,
508 value_running_window_sum DOUBLE PRECISION NOT NULL,
509 value_running_window_avg DOUBLE PRECISION NOT NULL,
510 value_running_window_count BIGINT NOT NULL,
511 PRIMARY KEY (metric_id, timestamp)
512 );
513
514 CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
515
516 CREATE TABLE IF NOT EXISTS metric_aggregations (
517 metric_id VARCHAR NOT NULL,
518 window_start BIGINT NOT NULL,
519 window_end BIGINT NOT NULL,
520 running_sum DOUBLE PRECISION NOT NULL,
521 running_count BIGINT NOT NULL,
522 min_value DOUBLE PRECISION NOT NULL,
523 max_value DOUBLE PRECISION NOT NULL,
524 PRIMARY KEY (metric_id, window_start, window_end)
525 );
526
527 CREATE INDEX IF NOT EXISTS idx_aggregations_window
528 ON metric_aggregations(window_start, window_end);
529 "#).map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
530
531 stmt.execute_update()
532 .map_err(|e| Status::internal(format!("Failed to create tables: {}", e)))?;
533
534 Ok(())
535 }
536
537 async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status> {
538 if metrics.is_empty() {
539 return Ok(());
540 }
541
542 if let Some(cutoff) = self.cache_manager.should_evict().await? {
544 let query = self.cache_manager.eviction_query(cutoff);
545 self.execute_eviction(&query).await?;
546 }
547
548 let window = TimeWindow::Sliding {
550 window: Duration::from_secs(3600), slide: Duration::from_secs(60), };
553
554 self.insert_batch_optimized(&metrics, window).await
556 }
557
558 async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status> {
559 if let Some(cutoff) = self.cache_manager.should_evict().await? {
561 let query = self.cache_manager.eviction_query(cutoff);
562 self.execute_eviction(&query).await?;
563 }
564
565 let mut conn = self.conn.lock().await;
566
567 let query = r#"
568 SELECT
569 metric_id,
570 timestamp,
571 value_running_window_sum,
572 value_running_window_avg,
573 value_running_window_count
574 FROM metrics
575 WHERE timestamp >= ?
576 ORDER BY timestamp ASC
577 "#;
578
579 let params = Self::prepare_timestamp_param(from_timestamp)?;
580 self.execute_query(&mut conn, query, Some(params)).await
581 }
582
583 async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status> {
584 let handle = self.statement_counter.fetch_add(1, Ordering::SeqCst);
585 let mut statements = self.prepared_statements.lock().await;
586 statements.push((handle, query.to_string()));
587 Ok(handle.to_le_bytes().to_vec())
588 }
589
590 async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status> {
591 let handle = u64::from_le_bytes(
592 statement_handle.try_into()
593 .map_err(|_| Status::invalid_argument("Invalid statement handle"))?
594 );
595
596 let statements = self.prepared_statements.lock().await;
597 let sql = statements
598 .iter()
599 .find(|(h, _)| *h == handle)
600 .map(|(_, sql)| sql.as_str())
601 .ok_or_else(|| Status::invalid_argument("Statement handle not found"))?;
602
603 let mut conn = self.conn.lock().await;
604 self.execute_query(&mut conn, sql, None).await
605 }
606
607 async fn aggregate_metrics(
608 &self,
609 function: AggregateFunction,
610 group_by: &GroupBy,
611 from_timestamp: i64,
612 to_timestamp: Option<i64>,
613 ) -> Result<Vec<AggregateResult>, Status> {
614 if let Some(cutoff) = self.cache_manager.should_evict().await? {
616 let query = self.cache_manager.eviction_query(cutoff);
617 self.execute_eviction(&query).await?;
618 }
619
620 const DEFAULT_COLUMNS: [&str; 5] = [
621 "metric_id",
622 "timestamp",
623 "value_running_window_sum",
624 "value_running_window_avg",
625 "value_running_window_count"
626 ];
627
628 let query = build_aggregate_query(
629 "metrics",
630 function,
631 group_by,
632 &DEFAULT_COLUMNS,
633 Some(from_timestamp),
634 to_timestamp,
635 );
636 let mut conn = self.conn.lock().await;
637 let metrics = self.execute_query(&mut conn, &query, None).await?;
638
639 let mut results = Vec::new();
640 for metric in metrics {
641 let result = AggregateResult {
642 value: metric.value_running_window_sum,
643 timestamp: metric.timestamp,
644 };
646 results.push(result);
647 }
648
649 Ok(results)
650 }
651
652 fn new_with_options(
653 connection_string: &str,
654 options: &HashMap<String, String>,
655 credentials: Option<&Credentials>,
656 ) -> Result<Self, Status> {
657 let driver_path = options.get("driver_path")
658 .ok_or_else(|| Status::invalid_argument("driver_path is required"))?;
659
660 let mut driver = ManagedDriver::load_dynamic_from_filename(
661 driver_path,
662 None,
663 AdbcVersion::V100,
664 ).map_err(|e| Status::internal(format!("Failed to load ADBC driver: {}", e)))?;
665
666 let mut database = driver.new_database()
667 .map_err(|e| Status::internal(format!("Failed to create database: {}", e)))?;
668
669 database.set_option(OptionDatabase::Uri, OptionValue::String(connection_string.to_string()))
671 .map_err(|e| Status::internal(format!("Failed to set connection string: {}", e)))?;
672
673 if let Some(creds) = credentials {
675 database.set_option(OptionDatabase::Username, OptionValue::String(creds.username.clone()))
676 .map_err(|e| Status::internal(format!("Failed to set username: {}", e)))?;
677
678 database.set_option(OptionDatabase::Password, OptionValue::String(creds.password.clone()))
679 .map_err(|e| Status::internal(format!("Failed to set password: {}", e)))?;
680 }
681
682 let connection = database.new_connection()
683 .map_err(|e| Status::internal(format!("Failed to create connection: {}", e)))?;
684
685 Ok(Self {
686 conn: Arc::new(Mutex::new(connection)),
687 statement_counter: AtomicU64::new(0),
688 prepared_statements: Arc::new(Mutex::new(Vec::new())),
689 cache_manager: CacheManager::new(None), table_manager: TableManager::new(),
691 })
692 }
693
694 async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status> {
695 let mut conn = self.conn.lock().await;
696 let sql = self.build_create_table_sql(table_name, schema);
697 self.execute_statement(&mut conn, &sql).await
698 }
699
700 async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status> {
701 let mut conn = self.conn.lock().await;
702 let sql = self.build_insert_sql(table_name, &batch);
703 self.execute_statement(&mut conn, &sql).await
704 }
705
706 async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status> {
707 let mut conn = self.conn.lock().await;
708 let mut stmt = conn.new_statement()
709 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
710
711 let columns = projection.map(|cols| cols.join(", ")).unwrap_or_else(|| "*".to_string());
712 let sql = format!("SELECT {} FROM {}", columns, table_name);
713
714 stmt.set_sql_query(&sql)
715 .map_err(|e| Status::internal(format!("Failed to set query: {}", e)))?;
716
717 let mut reader = stmt.execute()
718 .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
719
720 let batch = reader.next()
721 .ok_or_else(|| Status::internal("No data returned"))?
722 .map_err(|e| Status::internal(format!("Failed to read record batch: {}", e)))?;
723
724 let schema = batch.schema();
726 let mut arrays = Vec::with_capacity(batch.num_columns());
727
728 for _i in 0..batch.num_columns() {
729 let col = batch.column(_i);
730 let array: ArrayRef = match col.data_type() {
731 &duckdb::arrow::datatypes::DataType::Int64 => {
732 Arc::new(col.as_any().downcast_ref::<Int64Array>().unwrap().clone())
733 },
734 &duckdb::arrow::datatypes::DataType::Float64 => {
735 Arc::new(col.as_any().downcast_ref::<Float64Array>().unwrap().clone())
736 },
737 &duckdb::arrow::datatypes::DataType::Utf8 => {
738 Arc::new(col.as_any().downcast_ref::<StringArray>().unwrap().clone())
739 },
740 _ => return Err(Status::internal("Unsupported column type")),
741 };
742 arrays.push(array);
743 }
744
745 let fields: Vec<Field> = schema.fields().iter().map(|f| {
747 Field::new(
748 f.name(),
749 match f.data_type() {
750 &duckdb::arrow::datatypes::DataType::Int64 => DataType::Int64,
751 &duckdb::arrow::datatypes::DataType::Float64 => DataType::Float64,
752 &duckdb::arrow::datatypes::DataType::Utf8 => DataType::Utf8,
753 _ => DataType::Utf8, },
755 f.is_nullable()
756 )
757 }).collect();
758
759 let arrow_schema = Schema::new(fields);
760 RecordBatch::try_new(Arc::new(arrow_schema), arrays)
761 .map_err(|e| Status::internal(format!("Failed to create record batch: {}", e)))
762 }
763
764 async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status> {
765 let columns: Vec<&str> = view.aggregate_columns.iter()
766 .map(|s| s.as_str())
767 .collect();
768
769 let sql = build_aggregate_query(
770 &view.source_table,
771 view.function,
772 &view.group_by,
773 &columns,
774 None,
775 None
776 );
777
778 let mut conn = self.conn.lock().await;
779 let mut stmt = conn.new_statement()
780 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
781
782 stmt.set_sql_query(&format!("CREATE VIEW {} AS {}", view.source_table, sql))
783 .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
784
785 stmt.execute_update()
786 .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
787
788 Ok(())
789 }
790
791 async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status> {
792 let sql = format!("SELECT * FROM {}", view_name);
793
794 let mut conn = self.conn.lock().await;
795 let mut stmt = conn.new_statement()
796 .map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
797
798 stmt.set_sql_query(&sql)
799 .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
800
801 let mut reader = stmt.execute()
802 .map_err(|e| Status::internal(format!("Failed to execute query: {}", e)))?;
803
804 let batch = reader.next()
805 .ok_or_else(|| Status::internal("No data returned"))?
806 .map_err(|e| Status::internal(format!("Failed to read record batch: {}", e)))?;
807
808 let schema = batch.schema();
810 let mut arrays = Vec::with_capacity(batch.num_columns());
811
812 for i in 0..batch.num_columns() {
813 let col = batch.column(i);
814 let array: ArrayRef = match col.data_type() {
815 &duckdb::arrow::datatypes::DataType::Int64 => {
816 Arc::new(col.as_any().downcast_ref::<Int64Array>().unwrap().clone())
817 },
818 &duckdb::arrow::datatypes::DataType::Float64 => {
819 Arc::new(col.as_any().downcast_ref::<Float64Array>().unwrap().clone())
820 },
821 &duckdb::arrow::datatypes::DataType::Utf8 => {
822 Arc::new(col.as_any().downcast_ref::<StringArray>().unwrap().clone())
823 },
824 _ => return Err(Status::internal("Unsupported column type")),
825 };
826 arrays.push(array);
827 }
828
829 let fields: Vec<Field> = schema.fields().iter().map(|f| {
831 Field::new(
832 f.name(),
833 match f.data_type() {
834 &duckdb::arrow::datatypes::DataType::Int64 => DataType::Int64,
835 &duckdb::arrow::datatypes::DataType::Float64 => DataType::Float64,
836 &duckdb::arrow::datatypes::DataType::Utf8 => DataType::Utf8,
837 _ => DataType::Utf8, },
839 f.is_nullable()
840 )
841 }).collect();
842
843 let arrow_schema = Schema::new(fields);
844 RecordBatch::try_new(Arc::new(arrow_schema), arrays)
845 .map_err(|e| Status::internal(format!("Failed to create record batch: {}", e)))
846 }
847
848 async fn drop_table(&self, table_name: &str) -> Result<(), Status> {
849 let mut conn = self.conn.lock().await;
850 let mut stmt = conn.new_statement().map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
851 stmt.set_sql_query(&format!("DROP TABLE IF EXISTS {}", table_name))
852 .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
853 stmt.execute_update().map_err(|e| Status::internal(format!("Failed to drop table: {}", e)))?;
854 Ok(())
855 }
856
857 async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status> {
858 let mut conn = self.conn.lock().await;
859 let mut stmt = conn.new_statement().map_err(|e| Status::internal(format!("Failed to create statement: {}", e)))?;
860 stmt.set_sql_query(&format!("DROP VIEW IF EXISTS {}", view_name))
861 .map_err(|e| Status::internal(format!("Failed to set SQL query: {}", e)))?;
862 stmt.execute_update().map_err(|e| Status::internal(format!("Failed to drop view: {}", e)))?;
863 Ok(())
864 }
865
866 fn table_manager(&self) -> &TableManager {
867 &self.table_manager
868 }
869}
870
871fn format_value(array: &dyn Array, index: usize) -> String {
872 match array.data_type() {
873 DataType::Int8 => format!("{}", array.as_any().downcast_ref::<Int8Array>().unwrap().value(index)),
874 DataType::Int16 => format!("{}", array.as_any().downcast_ref::<Int16Array>().unwrap().value(index)),
875 DataType::Int32 => format!("{}", array.as_any().downcast_ref::<Int32Array>().unwrap().value(index)),
876 DataType::Int64 => format!("{}", array.as_any().downcast_ref::<Int64Array>().unwrap().value(index)),
877 DataType::Float32 => format!("{}", array.as_any().downcast_ref::<Float32Array>().unwrap().value(index)),
878 DataType::Float64 => format!("{}", array.as_any().downcast_ref::<Float64Array>().unwrap().value(index)),
879 DataType::Boolean => format!("{}", array.as_any().downcast_ref::<BooleanArray>().unwrap().value(index)),
880 DataType::Utf8 => format!("'{}'", array.as_any().downcast_ref::<StringArray>().unwrap().value(index)),
881 DataType::Binary => format!("X'{}'", hex::encode(array.as_any().downcast_ref::<BinaryArray>().unwrap().value(index))),
882 DataType::Timestamp(_, _) => {
883 let ts = array.as_any().downcast_ref::<TimestampNanosecondArray>().unwrap().value(index);
884 let seconds = ts / 1_000_000_000;
885 let nanos = (ts % 1_000_000_000) as u32;
886 format!("'{}'", chrono::DateTime::from_timestamp(seconds, nanos)
887 .unwrap_or_default()
888 .naive_utc())
889 },
890 _ => "NULL".to_string(),
891 }
892}