pub mod adbc;
pub mod duckdb;
pub mod cache;
pub mod table_manager;
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use std::collections::HashMap;
use crate::config::Credentials;
use crate::metrics::MetricRecord;
use crate::storage::table_manager::{TableManager, AggregationView};
use crate::aggregation::{AggregateFunction, GroupBy, AggregateResult, TimeWindow};
use tonic::Status;
#[derive(Debug, Clone)]
pub struct BatchAggregation {
pub metric_id: String,
pub window_start: i64,
pub window_end: i64,
pub running_sum: f64,
pub running_count: i64,
pub min_value: f64,
pub max_value: f64,
}
#[async_trait]
pub trait StorageBackend: Send + Sync + 'static {
async fn init(&self) -> Result<(), Status>;
async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status>;
async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status>;
async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status>;
async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status>;
async fn aggregate_metrics(
&self,
function: AggregateFunction,
group_by: &GroupBy,
from_timestamp: i64,
to_timestamp: Option<i64>,
) -> Result<Vec<AggregateResult>, Status>;
fn new_with_options(
connection_string: &str,
options: &HashMap<String, String>,
credentials: Option<&Credentials>,
) -> Result<Self, Status>
where
Self: Sized;
async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status>;
async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status>;
async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status>;
async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status>;
async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status>;
async fn drop_table(&self, table_name: &str) -> Result<(), Status>;
async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status>;
fn table_manager(&self) -> &TableManager;
async fn update_batch_aggregations(
&self,
batch: &[MetricRecord],
window: TimeWindow,
) -> Result<Vec<BatchAggregation>, Status> {
let mut aggregations = HashMap::new();
for metric in batch {
let (window_start, window_end) = window.window_bounds(metric.timestamp);
let key = (metric.metric_id.clone(), window_start, window_end);
let agg = aggregations.entry(key).or_insert_with(|| BatchAggregation {
metric_id: metric.metric_id.clone(),
window_start,
window_end,
running_sum: 0.0,
running_count: 0,
min_value: f64::INFINITY,
max_value: f64::NEG_INFINITY,
});
agg.running_sum += metric.value_running_window_sum;
agg.running_count += 1;
agg.min_value = agg.min_value.min(metric.value_running_window_sum);
agg.max_value = agg.max_value.max(metric.value_running_window_sum);
}
Ok(aggregations.into_values().collect())
}
async fn insert_batch_aggregations(
&self,
aggregations: Vec<BatchAggregation>,
) -> Result<(), Status> {
let mut batch = Vec::new();
for agg in aggregations {
batch.push(MetricRecord {
metric_id: agg.metric_id,
timestamp: agg.window_start,
value_running_window_sum: agg.running_sum,
value_running_window_avg: agg.running_sum / agg.running_count as f64,
value_running_window_count: agg.running_count,
});
}
self.insert_metrics(batch).await
}
}