hyprstream_core/storage/
table_manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use arrow_schema::Schema;
use tonic::Status;
use serde::{Serialize, Deserialize};
use crate::aggregation::{TimeWindow, AggregateFunction, GroupBy};

/// Configuration for an aggregation view
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregationView {
    pub source_table: String,
    pub function: AggregateFunction,
    pub group_by: GroupBy,
    pub window: TimeWindow,
    pub aggregate_columns: Vec<String>,
}

#[derive(Debug)]
pub struct TableManager {
    tables: Arc<RwLock<HashMap<String, Schema>>>,
    views: Arc<RwLock<HashMap<String, AggregationView>>>,
}

impl Clone for TableManager {
    fn clone(&self) -> Self {
        Self {
            tables: self.tables.clone(),
            views: self.views.clone(),
        }
    }
}

impl Default for TableManager {
    fn default() -> Self {
        Self::new()
    }
}

impl TableManager {
    pub fn new() -> Self {
        Self {
            tables: Arc::new(RwLock::new(HashMap::new())),
            views: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub async fn create_table(&self, name: String, schema: Schema) -> Result<(), Status> {
        let mut tables = self.tables.write().await;
        if tables.contains_key(&name) {
            return Err(Status::already_exists(format!("Table {} already exists", name)));
        }
        tables.insert(name, schema);
        Ok(())
    }

    pub async fn get_table_schema(&self, name: &str) -> Result<Schema, Status> {
        let tables = self.tables.read().await;
        tables.get(name)
            .cloned()
            .ok_or_else(|| Status::not_found(format!("Table {} not found", name)))
    }

    pub async fn create_aggregation_view(
        &self,
        name: String,
        source_table: String,
        function: AggregateFunction,
        group_by: GroupBy,
        window: TimeWindow,
        aggregate_columns: Vec<String>,
    ) -> Result<(), Status> {
        // Verify source table exists
        {
            let tables = self.tables.read().await;
            if !tables.contains_key(&source_table) {
                return Err(Status::not_found(format!("Source table {} not found", source_table)));
            }

            // Verify aggregate columns exist in source table
            let schema = tables.get(&source_table).unwrap();
            for col in &aggregate_columns {
                if !schema.fields().iter().any(|f| f.name() == col) {
                    return Err(Status::invalid_argument(format!(
                        "Column {} not found in source table {}",
                        col, source_table
                    )));
                }
            }
        }

        let view = AggregationView {
            source_table,
            function,
            group_by,
            window,
            aggregate_columns,
        };

        let mut views = self.views.write().await;
        if views.contains_key(&name) {
            return Err(Status::already_exists(format!("View {} already exists", name)));
        }
        views.insert(name, view);
        Ok(())
    }

    pub async fn get_aggregation_view(&self, name: &str) -> Result<AggregationView, Status> {
        let views = self.views.read().await;
        views.get(name)
            .cloned()
            .ok_or_else(|| Status::not_found(format!("View {} not found", name)))
    }

    pub async fn list_tables(&self) -> Vec<String> {
        let tables = self.tables.read().await;
        tables.keys().cloned().collect()
    }

    pub async fn list_aggregation_views(&self) -> Vec<String> {
        let views = self.views.read().await;
        views.keys().cloned().collect()
    }

    pub async fn drop_table(&self, name: &str) -> Result<(), Status> {
        let mut tables = self.tables.write().await;
        if tables.remove(name).is_none() {
            return Err(Status::not_found(format!("Table {} not found", name)));
        }
        Ok(())
    }

    pub async fn drop_aggregation_view(&self, name: &str) -> Result<(), Status> {
        let mut views = self.views.write().await;
        if views.remove(name).is_none() {
            return Err(Status::not_found(format!("View {} not found", name)));
        }
        Ok(())
    }
}