hyprstream_core/storage/
table_manager.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use arrow_schema::Schema;
5use tonic::Status;
6use serde::{Serialize, Deserialize};
7use crate::aggregation::{TimeWindow, AggregateFunction, GroupBy};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct AggregationView {
12 pub source_table: String,
13 pub function: AggregateFunction,
14 pub group_by: GroupBy,
15 pub window: TimeWindow,
16 pub aggregate_columns: Vec<String>,
17}
18
19#[derive(Debug)]
20pub struct TableManager {
21 tables: Arc<RwLock<HashMap<String, Schema>>>,
22 views: Arc<RwLock<HashMap<String, AggregationView>>>,
23}
24
25impl Clone for TableManager {
26 fn clone(&self) -> Self {
27 Self {
28 tables: self.tables.clone(),
29 views: self.views.clone(),
30 }
31 }
32}
33
34impl Default for TableManager {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl TableManager {
41 pub fn new() -> Self {
42 Self {
43 tables: Arc::new(RwLock::new(HashMap::new())),
44 views: Arc::new(RwLock::new(HashMap::new())),
45 }
46 }
47
48 pub async fn create_table(&self, name: String, schema: Schema) -> Result<(), Status> {
49 let mut tables = self.tables.write().await;
50 if tables.contains_key(&name) {
51 return Err(Status::already_exists(format!("Table {} already exists", name)));
52 }
53 tables.insert(name, schema);
54 Ok(())
55 }
56
57 pub async fn get_table_schema(&self, name: &str) -> Result<Schema, Status> {
58 let tables = self.tables.read().await;
59 tables.get(name)
60 .cloned()
61 .ok_or_else(|| Status::not_found(format!("Table {} not found", name)))
62 }
63
64 pub async fn create_aggregation_view(
65 &self,
66 name: String,
67 source_table: String,
68 function: AggregateFunction,
69 group_by: GroupBy,
70 window: TimeWindow,
71 aggregate_columns: Vec<String>,
72 ) -> Result<(), Status> {
73 {
75 let tables = self.tables.read().await;
76 if !tables.contains_key(&source_table) {
77 return Err(Status::not_found(format!("Source table {} not found", source_table)));
78 }
79
80 let schema = tables.get(&source_table).unwrap();
82 for col in &aggregate_columns {
83 if !schema.fields().iter().any(|f| f.name() == col) {
84 return Err(Status::invalid_argument(format!(
85 "Column {} not found in source table {}",
86 col, source_table
87 )));
88 }
89 }
90 }
91
92 let view = AggregationView {
93 source_table,
94 function,
95 group_by,
96 window,
97 aggregate_columns,
98 };
99
100 let mut views = self.views.write().await;
101 if views.contains_key(&name) {
102 return Err(Status::already_exists(format!("View {} already exists", name)));
103 }
104 views.insert(name, view);
105 Ok(())
106 }
107
108 pub async fn get_aggregation_view(&self, name: &str) -> Result<AggregationView, Status> {
109 let views = self.views.read().await;
110 views.get(name)
111 .cloned()
112 .ok_or_else(|| Status::not_found(format!("View {} not found", name)))
113 }
114
115 pub async fn list_tables(&self) -> Vec<String> {
116 let tables = self.tables.read().await;
117 tables.keys().cloned().collect()
118 }
119
120 pub async fn list_aggregation_views(&self) -> Vec<String> {
121 let views = self.views.read().await;
122 views.keys().cloned().collect()
123 }
124
125 pub async fn drop_table(&self, name: &str) -> Result<(), Status> {
126 let mut tables = self.tables.write().await;
127 if tables.remove(name).is_none() {
128 return Err(Status::not_found(format!("Table {} not found", name)));
129 }
130 Ok(())
131 }
132
133 pub async fn drop_aggregation_view(&self, name: &str) -> Result<(), Status> {
134 let mut views = self.views.write().await;
135 if views.remove(name).is_none() {
136 return Err(Status::not_found(format!("View {} not found", name)));
137 }
138 Ok(())
139 }
140}