hyprstream_core/storage/
mod.rs

1//! Storage backends for metric data persistence and caching.
2//!
3//! This module provides multiple storage backend implementations:
4//! - `duckdb`: High-performance embedded database for caching and local storage
5//! - `adbc`: Arrow Database Connectivity for external database integration
6//! - `cached`: Two-tier storage with configurable caching layer
7//!
8//! Each backend implements the `StorageBackend` trait, providing a consistent
9//! interface for metric storage and retrieval operations.
10
11pub mod adbc;
12pub mod duckdb;
13pub mod cache;
14pub mod table_manager;
15
16use arrow_array::RecordBatch;
17use arrow_schema::Schema;
18use async_trait::async_trait;
19use std::collections::HashMap;
20use crate::config::Credentials;
21use crate::metrics::MetricRecord;
22use crate::storage::table_manager::{TableManager, AggregationView};
23use crate::aggregation::{AggregateFunction, GroupBy, AggregateResult, TimeWindow};
24use tonic::Status;
25
26/// Batch-level aggregation state for efficient updates
27#[derive(Debug, Clone)]
28pub struct BatchAggregation {
29    /// The metric ID this aggregation belongs to
30    pub metric_id: String,
31    /// Start of the time window
32    pub window_start: i64,
33    /// End of the time window
34    pub window_end: i64,
35    /// Running sum within the window
36    pub running_sum: f64,
37    /// Running count within the window
38    pub running_count: i64,
39    /// Minimum value in the window
40    pub min_value: f64,
41    /// Maximum value in the window
42    pub max_value: f64,
43}
44
45/// Storage backend trait for metric data persistence.
46///
47/// This trait defines the interface that all storage backends must implement.
48/// It provides methods for:
49/// - Initialization and configuration
50/// - Metric data insertion
51/// - Metric data querying
52/// - SQL query preparation and execution
53/// - Aggregation of metrics
54/// - Table and view management
55#[async_trait]
56pub trait StorageBackend: Send + Sync + 'static {
57    /// Initialize the storage backend.
58    async fn init(&self) -> Result<(), Status>;
59
60    /// Insert metrics into storage.
61    async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status>;
62
63    /// Query metrics from storage.
64    async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status>;
65
66    /// Prepare a SQL query and return a handle.
67    /// The handle is backend-specific and opaque to the caller.
68    async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status>;
69
70    /// Execute a prepared SQL query using its handle.
71    /// The handle must have been obtained from prepare_sql.
72    async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status>;
73
74    /// Aggregate metrics using the specified function and grouping.
75    async fn aggregate_metrics(
76        &self,
77        function: AggregateFunction,
78        group_by: &GroupBy,
79        from_timestamp: i64,
80        to_timestamp: Option<i64>,
81    ) -> Result<Vec<AggregateResult>, Status>;
82
83    /// Create a new instance with the given options.
84    /// The connection string and options are backend-specific.
85    fn new_with_options(
86        connection_string: &str,
87        options: &HashMap<String, String>,
88        credentials: Option<&Credentials>,
89    ) -> Result<Self, Status>
90    where
91        Self: Sized;
92
93    /// Create a new table with the given schema
94    async fn create_table(&self, table_name: &str, schema: &Schema) -> Result<(), Status>;
95
96    /// Insert data into a table
97    async fn insert_into_table(&self, table_name: &str, batch: RecordBatch) -> Result<(), Status>;
98
99    /// Query data from a table
100    async fn query_table(&self, table_name: &str, projection: Option<Vec<String>>) -> Result<RecordBatch, Status>;
101
102    /// Create an aggregation view
103    async fn create_aggregation_view(&self, view: &AggregationView) -> Result<(), Status>;
104
105    /// Query data from an aggregation view
106    async fn query_aggregation_view(&self, view_name: &str) -> Result<RecordBatch, Status>;
107
108    /// Drop a table
109    async fn drop_table(&self, table_name: &str) -> Result<(), Status>;
110
111    /// Drop an aggregation view
112    async fn drop_aggregation_view(&self, view_name: &str) -> Result<(), Status>;
113
114    /// Get the table manager instance
115    fn table_manager(&self) -> &TableManager;
116
117    /// Update batch-level aggregations.
118    /// This is called during batch writes to maintain running aggregations.
119    async fn update_batch_aggregations(
120        &self,
121        batch: &[MetricRecord],
122        window: TimeWindow,
123    ) -> Result<Vec<BatchAggregation>, Status> {
124        // Default implementation that processes the batch and updates aggregations
125        let mut aggregations = HashMap::new();
126
127        for metric in batch {
128            let (window_start, window_end) = window.window_bounds(metric.timestamp);
129            let key = (metric.metric_id.clone(), window_start, window_end);
130
131            let agg = aggregations.entry(key).or_insert_with(|| BatchAggregation {
132                metric_id: metric.metric_id.clone(),
133                window_start,
134                window_end,
135                running_sum: 0.0,
136                running_count: 0,
137                min_value: f64::INFINITY,
138                max_value: f64::NEG_INFINITY,
139            });
140
141            // Update running aggregations
142            agg.running_sum += metric.value_running_window_sum;
143            agg.running_count += 1;
144            agg.min_value = agg.min_value.min(metric.value_running_window_sum);
145            agg.max_value = agg.max_value.max(metric.value_running_window_sum);
146        }
147
148        Ok(aggregations.into_values().collect())
149    }
150
151    /// Insert batch-level aggregations.
152    /// This is called after update_batch_aggregations to persist the aggregations.
153    async fn insert_batch_aggregations(
154        &self,
155        aggregations: Vec<BatchAggregation>,
156    ) -> Result<(), Status> {
157        // Default implementation that stores aggregations in a separate table
158        let mut batch = Vec::new();
159        for agg in aggregations {
160            batch.push(MetricRecord {
161                metric_id: agg.metric_id,
162                timestamp: agg.window_start,
163                value_running_window_sum: agg.running_sum,
164                value_running_window_avg: agg.running_sum / agg.running_count as f64,
165                value_running_window_count: agg.running_count,
166            });
167        }
168        self.insert_metrics(batch).await
169    }
170}