1use std::collections::HashMap;
38use std::sync::Arc;
39use duckdb::{Connection, Config, params, ToSql};
40use tokio::sync::Mutex;
41use tonic::Status;
42use crate::metrics::MetricRecord;
43use crate::config::Credentials;
44use crate::storage::{StorageBackend, BatchAggregation};
45use crate::storage::cache::{CacheManager, CacheEviction};
46use crate::storage::table_manager::{TableManager, AggregationView};
47use crate::aggregation::{TimeWindow, AggregateFunction, GroupBy, AggregateResult, build_aggregate_query};
48use async_trait::async_trait;
49use arrow::datatypes::{DataType, Field, Schema};
50use arrow::array::{
51 Array, ArrayRef, RecordBatch, Int64Array, Float64Array, StringArray,
52};
53use arrow::array::builder::{
54 ArrayBuilder, Int64Builder, Float64Builder, StringBuilder,
55};
56use std::time::Duration;
57
58#[derive(Clone)]
60pub struct DuckDbBackend {
61 conn: Arc<Mutex<Connection>>,
62 connection_string: String,
63 options: HashMap<String, String>,
64 cache_manager: CacheManager,
65 table_manager: TableManager,
66}
67
68impl DuckDbBackend {
69 pub fn new(connection_string: String, options: HashMap<String, String>, ttl: Option<u64>) -> Result<Self, Status> {
71 let config = Config::default();
72 let conn = Connection::open_with_flags(&connection_string, config)
73 .map_err(|e| Status::internal(e.to_string()))?;
74
75 let backend = Self {
76 conn: Arc::new(Mutex::new(conn)),
77 connection_string,
78 options,
79 cache_manager: CacheManager::new(ttl),
80 table_manager: TableManager::new(),
81 };
82
83 let backend_clone = backend.clone();
85 tokio::spawn(async move {
86 if let Err(e) = backend_clone.init().await {
87 tracing::error!("Failed to initialize tables: {}", e);
88 }
89 });
90
91 Ok(backend)
92 }
93
94 pub fn new_in_memory() -> Result<Self, Status> {
96 Self::new(":memory:".to_string(), HashMap::new(), Some(0))
97 }
98
99 async fn insert_batch_optimized(&self, metrics: &[MetricRecord], window: TimeWindow) -> Result<(), Status> {
101 let conn = self.conn.lock().await;
102
103 conn.execute("BEGIN TRANSACTION", params![])
105 .map_err(|e| Status::internal(format!("Failed to begin transaction: {}", e)))?;
106
107 let batch = Self::prepare_params(metrics)?;
109
110 let mut stmt = conn.prepare(r#"
112 INSERT INTO metrics (
113 metric_id,
114 timestamp,
115 value_running_window_sum,
116 value_running_window_avg,
117 value_running_window_count
118 ) VALUES (?, ?, ?, ?, ?)
119 "#).map_err(|e| Status::internal(format!("Failed to prepare statement: {}", e)))?;
120
121 for i in 0..batch.num_rows() {
123 let metric_id = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap().value(i);
124 let timestamp = batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().value(i);
125 let sum = batch.column(2).as_any().downcast_ref::<Float64Array>().unwrap().value(i);
126 let avg = batch.column(3).as_any().downcast_ref::<Float64Array>().unwrap().value(i);
127 let count = batch.column(4).as_any().downcast_ref::<Int64Array>().unwrap().value(i);
128
129 stmt.execute(params![
130 metric_id,
131 timestamp,
132 sum,
133 avg,
134 count,
135 ]).map_err(|e| Status::internal(format!("Failed to insert metrics: {}", e)))?;
136 }
137
138 let window_start = match window {
140 TimeWindow::Sliding { window, slide: _ } => {
141 let now = metrics.iter().map(|m| m.timestamp).max().unwrap_or(0);
142 now - window.as_nanos() as i64
143 }
144 TimeWindow::Fixed(start) => start.as_nanos() as i64,
145 TimeWindow::None => metrics.iter().map(|m| m.timestamp).min().unwrap_or(0),
146 };
147
148 let window_end = match window {
149 TimeWindow::Sliding { window: _, slide: _ } => {
150 metrics.iter().map(|m| m.timestamp).max().unwrap_or(0)
151 }
152 TimeWindow::Fixed(end) => end.as_nanos() as i64,
153 TimeWindow::None => metrics.iter().map(|m| m.timestamp).max().unwrap_or(0),
154 };
155
156 let mut aggregations = HashMap::new();
158 for metric in metrics {
159 let entry = aggregations.entry(metric.metric_id.clone()).or_insert_with(|| BatchAggregation {
160 metric_id: metric.metric_id.clone(),
161 window_start,
162 window_end,
163 running_sum: 0.0,
164 running_count: 0,
165 min_value: f64::INFINITY,
166 max_value: f64::NEG_INFINITY,
167 });
168
169 entry.running_sum += metric.value_running_window_sum;
170 entry.running_count += metric.value_running_window_count as i64;
171 entry.min_value = entry.min_value.min(metric.value_running_window_sum);
172 entry.max_value = entry.max_value.max(metric.value_running_window_sum);
173 }
174
175 let mut agg_stmt = conn.prepare(r#"
177 INSERT INTO metric_aggregations (
178 metric_id, window_start, window_end,
179 running_sum, running_count, min_value, max_value
180 ) VALUES (?, ?, ?, ?, ?, ?, ?)
181 ON CONFLICT (metric_id, window_start, window_end) DO UPDATE
182 SET running_sum = metric_aggregations.running_sum + EXCLUDED.running_sum,
183 running_count = metric_aggregations.running_count + EXCLUDED.running_count,
184 min_value = LEAST(metric_aggregations.min_value, EXCLUDED.min_value),
185 max_value = GREATEST(metric_aggregations.max_value, EXCLUDED.max_value)
186 "#).map_err(|e| Status::internal(format!("Failed to prepare aggregation statement: {}", e)))?;
187
188 for agg in aggregations.values() {
189 agg_stmt.execute(params![
190 agg.metric_id,
191 agg.window_start,
192 agg.window_end,
193 agg.running_sum,
194 agg.running_count,
195 agg.min_value,
196 agg.max_value,
197 ]).map_err(|e| Status::internal(format!("Failed to update aggregations: {}", e)))?;
198 }
199
200 conn.execute("COMMIT", params![])
202 .map_err(|e| Status::internal(format!("Failed to commit transaction: {}", e)))?;
203
204 Ok(())
205 }
206
207 fn prepare_params(metrics: &[MetricRecord]) -> Result<RecordBatch, Status> {
209 let schema = Arc::new(Schema::new(vec![
210 Field::new("metric_id", DataType::Utf8, false),
211 Field::new("timestamp", DataType::Int64, false),
212 Field::new("value_running_window_sum", DataType::Float64, false),
213 Field::new("value_running_window_avg", DataType::Float64, false),
214 Field::new("value_running_window_count", DataType::Int64, false),
215 ]));
216
217 let metric_ids = StringArray::from_iter_values(metrics.iter().map(|m| m.metric_id.as_str()));
218 let timestamps = Int64Array::from_iter_values(metrics.iter().map(|m| m.timestamp));
219 let sums = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_sum));
220 let avgs = Float64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_avg));
221 let counts = Int64Array::from_iter_values(metrics.iter().map(|m| m.value_running_window_count));
222
223 let arrays: Vec<ArrayRef> = vec![
224 Arc::new(metric_ids),
225 Arc::new(timestamps),
226 Arc::new(sums),
227 Arc::new(avgs),
228 Arc::new(counts),
229 ];
230
231 RecordBatch::try_new(schema, arrays)
232 .map_err(|e| Status::internal(format!("Failed to create parameter batch: {}", e)))
233 }
234
235 async fn create_tables(&self) -> Result<(), Status> {
237 let conn = self.conn.lock().await;
238
239 conn.execute(r#"
241 CREATE TABLE IF NOT EXISTS metrics (
242 metric_id VARCHAR NOT NULL,
243 timestamp BIGINT NOT NULL,
244 value_running_window_sum DOUBLE NOT NULL,
245 value_running_window_avg DOUBLE NOT NULL,
246 value_running_window_count BIGINT NOT NULL,
247 PRIMARY KEY (metric_id, timestamp)
248 )
249 "#, params![]).map_err(|e| Status::internal(e.to_string()))?;
250
251 conn.execute(
253 "CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp)",
254 params![]
255 ).map_err(|e| Status::internal(e.to_string()))?;
256
257 conn.execute(r#"
259 CREATE TABLE IF NOT EXISTS metric_aggregations (
260 metric_id VARCHAR NOT NULL,
261 window_start BIGINT NOT NULL,
262 window_end BIGINT NOT NULL,
263 running_sum DOUBLE NOT NULL,
264 running_count BIGINT NOT NULL,
265 min_value DOUBLE NOT NULL,
266 max_value DOUBLE NOT NULL,
267 PRIMARY KEY (metric_id, window_start, window_end)
268 )
269 "#, params![]).map_err(|e| Status::internal(e.to_string()))?;
270
271 conn.execute(
273 "CREATE INDEX IF NOT EXISTS idx_aggregations_window ON metric_aggregations(window_start, window_end)",
274 params![]
275 ).map_err(|e| Status::internal(e.to_string()))?;
276
277 Ok(())
278 }
279}
280
281#[async_trait]
282impl CacheEviction for DuckDbBackend {
283 async fn execute_eviction(&self, query: &str) -> Result<(), Status> {
284 let conn = self.conn.clone();
285 let query = query.to_string();
286 tokio::spawn(async move {
287 let conn_guard = conn.lock().await;
288 if let Err(e) = conn_guard.execute_batch(&query) {
289 tracing::error!("Background eviction error: {}", e);
290 }
291 });
292 Ok(())
293 }
294}
295
296#[async_trait]
297impl StorageBackend for DuckDbBackend {
298 async fn init(&self) -> Result<(), Status> {
299 let conn = self.conn.lock().await;
300
301 conn.execute_batch(r#"
303 CREATE TABLE IF NOT EXISTS metrics (
304 metric_id VARCHAR NOT NULL,
305 timestamp BIGINT NOT NULL,
306 value_running_window_sum DOUBLE NOT NULL,
307 value_running_window_avg DOUBLE NOT NULL,
308 value_running_window_count BIGINT NOT NULL,
309 PRIMARY KEY (metric_id, timestamp)
310 );
311
312 CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
313
314 CREATE TABLE IF NOT EXISTS metric_aggregations (
315 metric_id VARCHAR NOT NULL,
316 window_start BIGINT NOT NULL,
317 window_end BIGINT NOT NULL,
318 running_sum DOUBLE NOT NULL,
319 running_count BIGINT NOT NULL,
320 min_value DOUBLE NOT NULL,
321 max_value DOUBLE NOT NULL,
322 PRIMARY KEY (metric_id, window_start, window_end)
323 );
324
325 CREATE INDEX IF NOT EXISTS idx_aggregations_window
326 ON metric_aggregations(window_start, window_end);
327 "#).map_err(|e| Status::internal(format!("Failed to create tables: {}", e)))?;
328
329 Ok(())
330 }
331
332 async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status> {
333 if metrics.is_empty() {
334 return Ok(());
335 }
336
337 if let Some(cutoff) = self.cache_manager.should_evict().await? {
339 let query = self.cache_manager.eviction_query(cutoff);
340 self.execute_eviction(&query).await?;
341 }
342
343 let window = TimeWindow::Sliding {
345 window: Duration::from_secs(3600), slide: Duration::from_secs(60), };
348
349 self.insert_batch_optimized(&metrics, window).await
351 }
352
353 async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status> {
354 if let Some(cutoff) = self.cache_manager.should_evict().await? {
356 let query = self.cache_manager.eviction_query(cutoff);
357 self.execute_eviction(&query).await?;
358 }
359
360 let query = format!(
361 "SELECT metric_id, timestamp, value_running_window_sum, value_running_window_avg, value_running_window_count \
362 FROM metrics WHERE timestamp >= {} ORDER BY timestamp ASC",
363 from_timestamp
364 );
365
366 let conn = self.conn.lock().await;
367 let mut stmt = conn.prepare(&query)
368 .map_err(|e| Status::internal(e.to_string()))?;
369
370 let mut rows = stmt.query(params![])
371 .map_err(|e| Status::internal(e.to_string()))?;
372
373 let mut metrics = Vec::new();
374 while let Some(row) = rows.next().map_err(|e| Status::internal(e.to_string()))? {
375 metrics.push(MetricRecord {
376 metric_id: row.get(0).map_err(|e| Status::internal(e.to_string()))?,
377 timestamp: row.get(1).map_err(|e| Status::internal(e.to_string()))?,
378 value_running_window_sum: row.get(2).map_err(|e| Status::internal(e.to_string()))?,
379 value_running_window_avg: row.get(3).map_err(|e| Status::internal(e.to_string()))?,
380 value_running_window_count: row.get(4).map_err(|e| Status::internal(e.to_string()))?,
381 });
382 }
383
384 Ok(metrics)
385 }
386
387 async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status> {
388 Ok(query.as_bytes().to_vec())
389 }
390
391 async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status> {
392 let sql = std::str::from_utf8(statement_handle)
393 .map_err(|e| Status::internal(e.to_string()))?;
394 self.query_metrics(sql.parse().unwrap_or(0)).await
395 }
396
397 async fn aggregate_metrics(
398 &self,
399 function: AggregateFunction,
400 group_by: &GroupBy,
401 from_timestamp: i64,
402 to_timestamp: Option<i64>,
403 ) -> Result<Vec<AggregateResult>, Status> {
404 if let Some(cutoff) = self.cache_manager.should_evict().await? {
406 let query = self.cache_manager.eviction_query(cutoff);
407 self.execute_eviction(&query).await?;
408 }
409
410 let query = build_aggregate_query(
411 "metrics",
412 function,
413 group_by,
414 &["value_running_window_sum"],
415 Some(from_timestamp),
416 to_timestamp,
417 );
418
419 let conn = self.conn.lock().await;
420 let mut stmt = conn.prepare(&query)
421 .map_err(|e| Status::internal(e.to_string()))?;
422
423 let mut rows = stmt.query(params![])
424 .map_err(|e| Status::internal(e.to_string()))?;
425
426 let mut results = Vec::new();
427 while let Some(row) = rows.next().map_err(|e| Status::internal(e.to_string()))? {
428 let value: f64 = row.get(0).map_err(|e| Status::internal(e.to_string()))?;
429 let timestamp: i64 = row.get(1).map_err(|e| Status::internal(e.to_string()))?;
430
431 results.push(AggregateResult {
432 value,
433 timestamp,
434 });
435 }
436
437 Ok(results)
438 }
439
440 fn new_with_options(
441 connection_string: &str,
442 options: &HashMap<String, String>,
443 credentials: Option<&Credentials>,
444 ) -> Result<Self, Status> {
445 let mut all_options = options.clone();
446 if let Some(creds) = credentials {
447 all_options.insert("username".to_string(), creds.username.clone());
448 all_options.insert("password".to_string(), creds.password.clone());
449 }
450
451 let ttl = all_options.get("ttl")
452 .and_then(|s| s.parse().ok())
453 .map(|ttl| if ttl == 0 { None } else { Some(ttl) })
454 .unwrap_or(None);
455
456 Self::new(connection_string.to_string(), all_options, ttl)
457 }
458
459 async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status> {
460 let sql = Self::schema_to_create_table_sql(table_name, schema);
462 self.execute(&sql).await?;
463
464 self.table_manager.create_table(table_name.to_string(), schema.clone()).await?;
466 Ok(())
467 }
468
469 async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status> {
470 let conn = self.conn.lock().await;
471 let mut stmt = conn.prepare(&format!("INSERT INTO {} VALUES ({})",
472 table_name,
473 (0..batch.num_columns()).map(|_| "?").collect::<Vec<_>>().join(", ")
474 )).map_err(|e| Status::internal(e.to_string()))?;
475
476 for row_idx in 0..batch.num_rows() {
477 let mut param_values: Vec<Box<dyn ToSql>> = Vec::new();
478 for col_idx in 0..batch.num_columns() {
479 let col = batch.column(col_idx);
480 match col.data_type() {
481 DataType::Int64 => {
482 let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
483 param_values.push(Box::new(array.value(row_idx)));
484 }
485 DataType::Float64 => {
486 let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
487 param_values.push(Box::new(array.value(row_idx)));
488 }
489 DataType::Utf8 => {
490 let array = col.as_any().downcast_ref::<StringArray>().unwrap();
491 param_values.push(Box::new(array.value(row_idx).to_string()));
492 }
493 _ => return Err(Status::internal("Unsupported column type")),
494 }
495 }
496
497 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
498 stmt.execute(param_refs.as_slice()).map_err(|e| Status::internal(e.to_string()))?;
499 }
500
501 Ok(())
502 }
503
504 async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status> {
505 let schema = self.table_manager.get_table_schema(table_name).await?;
506
507 let mut builders: Vec<Box<dyn ArrayBuilder>> = schema.fields().iter()
508 .map(|field| Self::create_array_builder(field))
509 .collect();
510
511 let projection = projection.unwrap_or_else(|| {
512 schema.fields().iter().map(|f| f.name().clone()).collect()
513 });
514
515 let sql = format!(
516 "SELECT {} FROM {}",
517 projection.join(", "),
518 table_name
519 );
520
521 let conn = self.conn.lock().await;
522 let mut stmt = conn.prepare(&sql)
523 .map_err(|e| Status::internal(e.to_string()))?;
524
525 let mut rows = stmt.query(params![])
526 .map_err(|e| Status::internal(e.to_string()))?;
527
528 while let Some(row) = rows.next().map_err(|e| Status::internal(e.to_string()))? {
529 for (i, field) in schema.fields().iter().enumerate() {
530 match field.data_type() {
531 DataType::Int64 => {
532 let builder = builders[i].as_any_mut().downcast_mut::<Int64Builder>().unwrap();
533 match row.get::<usize, i64>(i) {
534 Ok(value) => builder.append_value(value),
535 Err(_) => builder.append_null(),
536 }
537 }
538 DataType::Float64 => {
539 let builder = builders[i].as_any_mut().downcast_mut::<Float64Builder>().unwrap();
540 match row.get::<usize, f64>(i) {
541 Ok(value) => builder.append_value(value),
542 Err(_) => builder.append_null(),
543 }
544 }
545 DataType::Utf8 => {
546 let builder = builders[i].as_any_mut().downcast_mut::<StringBuilder>().unwrap();
547 match row.get::<usize, String>(i) {
548 Ok(value) => builder.append_value(value),
549 Err(_) => builder.append_null(),
550 }
551 }
552 _ => return Err(Status::internal("Unsupported column type")),
553 }
554 }
555 }
556
557 let arrays: Vec<ArrayRef> = builders.into_iter()
558 .map(|mut builder| Arc::new(builder.finish()) as ArrayRef)
559 .collect();
560
561 Ok(RecordBatch::try_new(Arc::new(schema), arrays)
562 .map_err(|e| Status::internal(format!("Failed to create record batch: {}", e)))?)
563 }
564
565 async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status> {
566 let columns: Vec<&str> = view.aggregate_columns.iter()
567 .map(|s| s.as_str())
568 .collect();
569
570 let sql = build_aggregate_query(
571 &view.source_table,
572 view.function,
573 &view.group_by,
574 &columns,
575 None,
576 None
577 );
578
579 let view_name = format!("agg_view_{}", view.source_table);
580 let conn = self.conn.lock().await;
581 conn.execute(&format!("CREATE VIEW {} AS {}", view_name, sql), params![])
582 .map_err(|e| Status::internal(format!("Failed to create view: {}", e)))?;
583
584 self.table_manager.create_aggregation_view(
586 view_name,
587 view.source_table.clone(),
588 view.function.clone(),
589 view.group_by.clone(),
590 view.window.clone(),
591 view.aggregate_columns.clone(),
592 ).await?;
593
594 Ok(())
595 }
596
597 async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status> {
598 self.query_table(view_name, None).await
599 }
600
601 async fn drop_table(&self, table_name: &str) -> Result<(), Status> {
602 let conn = self.conn.lock().await;
603 conn.execute(&format!("DROP TABLE IF EXISTS {}", table_name), params![])
604 .map_err(|e| Status::internal(format!("Failed to drop table: {}", e)))?;
605
606 self.table_manager.drop_table(table_name).await?;
607 Ok(())
608 }
609
610 async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status> {
611 let conn = self.conn.lock().await;
612 conn.execute(&format!("DROP VIEW IF EXISTS {}", view_name), params![])
613 .map_err(|e| Status::internal(format!("Failed to drop view: {}", e)))?;
614
615 self.table_manager.drop_aggregation_view(view_name).await?;
616 Ok(())
617 }
618
619 fn table_manager(&self) -> &TableManager {
620 &self.table_manager
621 }
622}
623
624impl DuckDbBackend {
625 async fn execute(&self, query: &str) -> Result<(), Status> {
627 let conn = self.conn.lock().await;
628 conn.execute(query, params![])
629 .map_err(|e| Status::internal(e.to_string()))?;
630 Ok(())
631 }
632
633 fn schema_to_create_table_sql(table_name: &str, schema: &Schema) -> String {
635 let mut sql = format!("CREATE TABLE IF NOT EXISTS \"{}\" (", table_name);
636 let mut first = true;
637
638 for field in schema.fields() {
639 if !first {
640 sql.push_str(", ");
641 }
642 first = false;
643
644 sql.push_str(&format!("\"{}\" {}", field.name(), Self::arrow_type_to_duckdb_type(field.data_type())));
645 }
646
647 sql.push_str(")");
648 sql
649 }
650
651 fn arrow_type_to_duckdb_type(data_type: &DataType) -> &'static str {
653 match data_type {
654 DataType::Boolean => "BOOLEAN",
655 DataType::Int8 => "TINYINT",
656 DataType::Int16 => "SMALLINT",
657 DataType::Int32 => "INTEGER",
658 DataType::Int64 => "BIGINT",
659 DataType::UInt8 => "TINYINT",
660 DataType::UInt16 => "SMALLINT",
661 DataType::UInt32 => "INTEGER",
662 DataType::UInt64 => "BIGINT",
663 DataType::Float32 => "REAL",
664 DataType::Float64 => "DOUBLE",
665 DataType::Utf8 => "VARCHAR",
666 DataType::Binary => "BLOB",
667 DataType::Date32 => "DATE",
668 DataType::Date64 => "DATE",
669 DataType::Time32(_) => "TIME",
670 DataType::Time64(_) => "TIME",
671 DataType::Timestamp(_, _) => "TIMESTAMP",
672 _ => "VARCHAR", }
674 }
675
676 fn create_array_builder(field: &Field) -> Box<dyn ArrayBuilder> {
677 match field.data_type() {
678 DataType::Int64 => Box::new(Int64Builder::new()),
679 DataType::Float64 => Box::new(Float64Builder::new()),
680 DataType::Utf8 => Box::new(StringBuilder::new()),
681 _ => panic!("Unsupported column type"),
682 }
683 }
684}
685