hyprstream_core/storage/
mod.rs1pub mod adbc;
12pub mod duckdb;
13pub mod cache;
14pub mod table_manager;
15
16use arrow_array::RecordBatch;
17use arrow_schema::Schema;
18use async_trait::async_trait;
19use std::collections::HashMap;
20use crate::config::Credentials;
21use crate::metrics::MetricRecord;
22use crate::storage::table_manager::{TableManager, AggregationView};
23use crate::aggregation::{AggregateFunction, GroupBy, AggregateResult, TimeWindow};
24use tonic::Status;
25
26#[derive(Debug, Clone)]
28pub struct BatchAggregation {
29 pub metric_id: String,
31 pub window_start: i64,
33 pub window_end: i64,
35 pub running_sum: f64,
37 pub running_count: i64,
39 pub min_value: f64,
41 pub max_value: f64,
43}
44
45#[async_trait]
56pub trait StorageBackend: Send + Sync + 'static {
57 async fn init(&self) -> Result<(), Status>;
59
60 async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status>;
62
63 async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status>;
65
66 async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status>;
69
70 async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status>;
73
74 async fn aggregate_metrics(
76 &self,
77 function: AggregateFunction,
78 group_by: &GroupBy,
79 from_timestamp: i64,
80 to_timestamp: Option<i64>,
81 ) -> Result<Vec<AggregateResult>, Status>;
82
83 fn new_with_options(
86 connection_string: &str,
87 options: &HashMap<String, String>,
88 credentials: Option<&Credentials>,
89 ) -> Result<Self, Status>
90 where
91 Self: Sized;
92
93 async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status>;
95
96 async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status>;
98
99 async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status>;
101
102 async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status>;
104
105 async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status>;
107
108 async fn drop_table(&self, table_name: &str) -> Result<(), Status>;
110
111 async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status>;
113
114 fn table_manager(&self) -> &TableManager;
116
117 async fn update_batch_aggregations(
120 &self,
121 batch: &[MetricRecord],
122 window: TimeWindow,
123 ) -> Result<Vec<BatchAggregation>, Status> {
124 let mut aggregations = HashMap::new();
126
127 for metric in batch {
128 let (window_start, window_end) = window.window_bounds(metric.timestamp);
129 let key = (metric.metric_id.clone(), window_start, window_end);
130
131 let agg = aggregations.entry(key).or_insert_with(|| BatchAggregation {
132 metric_id: metric.metric_id.clone(),
133 window_start,
134 window_end,
135 running_sum: 0.0,
136 running_count: 0,
137 min_value: f64::INFINITY,
138 max_value: f64::NEG_INFINITY,
139 });
140
141 agg.running_sum += metric.value_running_window_sum;
143 agg.running_count += 1;
144 agg.min_value = agg.min_value.min(metric.value_running_window_sum);
145 agg.max_value = agg.max_value.max(metric.value_running_window_sum);
146 }
147
148 Ok(aggregations.into_values().collect())
149 }
150
151 async fn insert_batch_aggregations(
154 &self,
155 aggregations: Vec<BatchAggregation>,
156 ) -> Result<(), Status> {
157 let mut batch = Vec::new();
159 for agg in aggregations {
160 batch.push(MetricRecord {
161 metric_id: agg.metric_id,
162 timestamp: agg.window_start,
163 value_running_window_sum: agg.running_sum,
164 value_running_window_avg: agg.running_sum / agg.running_count as f64,
165 value_running_window_count: agg.running_count,
166 });
167 }
168 self.insert_metrics(batch).await
169 }
170}