hyprstream_core/storage/
table_manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use arrow_schema::Schema;
5use tonic::Status;
6use serde::{Serialize, Deserialize};
7use crate::aggregation::{TimeWindow, AggregateFunction, GroupBy};
8
9/// Configuration for an aggregation view
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct AggregationView {
12    pub source_table: String,
13    pub function: AggregateFunction,
14    pub group_by: GroupBy,
15    pub window: TimeWindow,
16    pub aggregate_columns: Vec<String>,
17}
18
19#[derive(Debug)]
20pub struct TableManager {
21    tables: Arc<RwLock<HashMap<String, Schema>>>,
22    views: Arc<RwLock<HashMap<String, AggregationView>>>,
23}
24
25impl Clone for TableManager {
26    fn clone(&self) -> Self {
27        Self {
28            tables: self.tables.clone(),
29            views: self.views.clone(),
30        }
31    }
32}
33
34impl Default for TableManager {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl TableManager {
41    pub fn new() -> Self {
42        Self {
43            tables: Arc::new(RwLock::new(HashMap::new())),
44            views: Arc::new(RwLock::new(HashMap::new())),
45        }
46    }
47
48    pub async fn create_table(&self, name: String, schema: Schema) -> Result<(), Status> {
49        let mut tables = self.tables.write().await;
50        if tables.contains_key(&name) {
51            return Err(Status::already_exists(format!("Table {} already exists", name)));
52        }
53        tables.insert(name, schema);
54        Ok(())
55    }
56
57    pub async fn get_table_schema(&self, name: &str) -> Result<Schema, Status> {
58        let tables = self.tables.read().await;
59        tables.get(name)
60            .cloned()
61            .ok_or_else(|| Status::not_found(format!("Table {} not found", name)))
62    }
63
64    pub async fn create_aggregation_view(
65        &self,
66        name: String,
67        source_table: String,
68        function: AggregateFunction,
69        group_by: GroupBy,
70        window: TimeWindow,
71        aggregate_columns: Vec<String>,
72    ) -> Result<(), Status> {
73        // Verify source table exists
74        {
75            let tables = self.tables.read().await;
76            if !tables.contains_key(&source_table) {
77                return Err(Status::not_found(format!("Source table {} not found", source_table)));
78            }
79
80            // Verify aggregate columns exist in source table
81            let schema = tables.get(&source_table).unwrap();
82            for col in &aggregate_columns {
83                if !schema.fields().iter().any(|f| f.name() == col) {
84                    return Err(Status::invalid_argument(format!(
85                        "Column {} not found in source table {}",
86                        col, source_table
87                    )));
88                }
89            }
90        }
91
92        let view = AggregationView {
93            source_table,
94            function,
95            group_by,
96            window,
97            aggregate_columns,
98        };
99
100        let mut views = self.views.write().await;
101        if views.contains_key(&name) {
102            return Err(Status::already_exists(format!("View {} already exists", name)));
103        }
104        views.insert(name, view);
105        Ok(())
106    }
107
108    pub async fn get_aggregation_view(&self, name: &str) -> Result<AggregationView, Status> {
109        let views = self.views.read().await;
110        views.get(name)
111            .cloned()
112            .ok_or_else(|| Status::not_found(format!("View {} not found", name)))
113    }
114
115    pub async fn list_tables(&self) -> Vec<String> {
116        let tables = self.tables.read().await;
117        tables.keys().cloned().collect()
118    }
119
120    pub async fn list_aggregation_views(&self) -> Vec<String> {
121        let views = self.views.read().await;
122        views.keys().cloned().collect()
123    }
124
125    pub async fn drop_table(&self, name: &str) -> Result<(), Status> {
126        let mut tables = self.tables.write().await;
127        if tables.remove(name).is_none() {
128            return Err(Status::not_found(format!("Table {} not found", name)));
129        }
130        Ok(())
131    }
132
133    pub async fn drop_aggregation_view(&self, name: &str) -> Result<(), Status> {
134        let mut views = self.views.write().await;
135        if views.remove(name).is_none() {
136            return Err(Status::not_found(format!("View {} not found", name)));
137        }
138        Ok(())
139    }
140}