1use std::collections::HashMap;
4use std::sync::RwLock;
5
6use krishiv_plan::{ExecutionKind, LogicalPlan, NodeOp, PlanNode};
7
8use crate::{SqlError, SqlResult};
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum LiveTableStatement {
13 Create { name: String, query: String },
14 Refresh { name: String },
15 Drop { name: String },
16}
17
18#[derive(Debug, Default)]
28pub struct LiveTableRegistry {
29 tables: RwLock<HashMap<String, String>>,
30}
31
32impl LiveTableRegistry {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn try_register(
40 &self,
41 name: impl Into<String>,
42 query: impl Into<String>,
43 ) -> Result<bool, SqlError> {
44 let mut tables = self.tables.write().map_err(|_| SqlError::DataFusion {
45 message: "live table registry lock poisoned".into(),
46 })?;
47 let name = name.into();
48 let is_new = !tables.contains_key(&name);
49 tables.insert(name, query.into());
50 Ok(is_new)
51 }
52
53 pub fn register(
54 &self,
55 name: impl Into<String>,
56 query: impl Into<String>,
57 ) -> Result<(), SqlError> {
58 self.try_register(name, query).map(|_| ())
59 }
60
61 pub fn remove_table(&self, name: &str) -> SqlResult<bool> {
62 let mut tables = self.tables.write().map_err(|_| SqlError::DataFusion {
63 message: "live table registry lock poisoned".into(),
64 })?;
65 Ok(tables.remove(name).is_some())
66 }
67
68 pub fn contains(&self, name: &str) -> SqlResult<bool> {
69 let tables = self.tables.read().map_err(|_| SqlError::DataFusion {
70 message: "live table registry lock poisoned".into(),
71 })?;
72 Ok(tables.contains_key(name))
73 }
74
75 pub fn query(&self, name: &str) -> SqlResult<Option<String>> {
76 let tables = self.tables.read().map_err(|_| SqlError::DataFusion {
77 message: "live table registry lock poisoned".into(),
78 })?;
79 Ok(tables.get(name).cloned())
80 }
81}
82
83pub fn parse_live_table_statement(sql: &str) -> SqlResult<Option<LiveTableStatement>> {
85 let trimmed = sql.trim().trim_end_matches(';');
86 let upper = trimmed.to_uppercase();
87
88 if upper.starts_with("CREATE LIVE TABLE ") {
89 let rest =
90 trimmed
91 .get("CREATE LIVE TABLE ".len()..)
92 .ok_or_else(|| SqlError::Unsupported {
93 feature: "CREATE LIVE TABLE".into(),
94 })?;
95 let (name, query) = split_name_and_query(rest)?;
96 return Ok(Some(LiveTableStatement::Create { name, query }));
97 }
98
99 if upper.starts_with("REFRESH LIVE TABLE ") {
100 let name = trimmed
101 .get("REFRESH LIVE TABLE ".len()..)
102 .ok_or_else(|| SqlError::Unsupported {
103 feature: "REFRESH LIVE TABLE".into(),
104 })?
105 .trim()
106 .to_string();
107 if name.is_empty() {
108 return Err(SqlError::EmptyTableName);
109 }
110 return Ok(Some(LiveTableStatement::Refresh { name }));
111 }
112
113 if upper.starts_with("DROP LIVE TABLE ") {
114 let name = trimmed
115 .get("DROP LIVE TABLE ".len()..)
116 .ok_or_else(|| SqlError::Unsupported {
117 feature: "DROP LIVE TABLE".into(),
118 })?
119 .trim()
120 .to_string();
121 if name.is_empty() {
122 return Err(SqlError::EmptyTableName);
123 }
124 return Ok(Some(LiveTableStatement::Drop { name }));
125 }
126
127 Ok(None)
128}
129
130fn split_name_and_query(rest: &str) -> SqlResult<(String, String)> {
131 let upper = rest.to_uppercase();
132 let as_pos = upper.find(" AS ").ok_or_else(|| SqlError::Unsupported {
133 feature: "CREATE LIVE TABLE requires AS <query>".into(),
134 })?;
135 let name = rest[..as_pos].trim().to_string();
136 let query = rest[as_pos + 4..].trim().to_string();
137 if name.is_empty() {
138 return Err(SqlError::EmptyTableName);
139 }
140 if query.is_empty() {
141 return Err(SqlError::EmptyQuery);
142 }
143 Ok((name, query))
144}
145
146pub fn plan_live_table(stmt: LiveTableStatement) -> LogicalPlan {
148 match stmt {
149 LiveTableStatement::Create { name, query } => LogicalPlan::new(
150 format!("create-live-table:{name}"),
151 ExecutionKind::Streaming,
152 )
153 .with_node(
154 PlanNode::new(
155 format!("create-live-{name}"),
156 format!("CREATE LIVE TABLE {name}"),
157 ExecutionKind::Streaming,
158 )
159 .with_op(NodeOp::CreateLiveTable { name, query }),
160 ),
161 LiveTableStatement::Refresh { name } => LogicalPlan::new(
162 format!("refresh-live-table:{name}"),
163 ExecutionKind::Streaming,
164 )
165 .with_node(
166 PlanNode::new(
167 format!("refresh-live-{name}"),
168 format!("REFRESH LIVE TABLE {name}"),
169 ExecutionKind::Streaming,
170 )
171 .with_op(NodeOp::RefreshLiveTable { name }),
172 ),
173 LiveTableStatement::Drop { name } => {
174 LogicalPlan::new(format!("drop-live-table:{name}"), ExecutionKind::Batch).with_node(
175 PlanNode::new(
176 format!("drop-live-{name}"),
177 format!("DROP LIVE TABLE {name}"),
178 ExecutionKind::Batch,
179 )
180 .with_op(NodeOp::DropLiveTable { name }),
181 )
182 }
183 }
184}
185
186pub fn execute_live_table_ddl(
199 registry: &LiveTableRegistry,
200 sql: &str,
201) -> SqlResult<Option<LogicalPlan>> {
202 let Some(stmt) = parse_live_table_statement(sql)? else {
203 return Ok(None);
204 };
205 match &stmt {
206 LiveTableStatement::Create { name, query } => {
207 registry.register(name.clone(), query.clone())?;
208 }
209 LiveTableStatement::Drop { name } => {
210 registry.remove_table(name)?;
211 }
212 LiveTableStatement::Refresh { name } => {
213 let Some(query) = registry.query(name)? else {
214 return Err(SqlError::Unsupported {
215 feature: format!("REFRESH LIVE TABLE {name}: table is not registered"),
216 });
217 };
218 registry.register(name.clone(), query)?;
221 }
222 }
223 Ok(Some(plan_live_table(stmt)))
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn parse_create_live_table() {
232 let stmt = parse_live_table_statement(
233 "CREATE LIVE TABLE orders_summary AS SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id",
234 )
235 .unwrap()
236 .unwrap();
237 match stmt {
238 LiveTableStatement::Create { name, query } => {
239 assert_eq!(name, "orders_summary");
240 assert!(query.contains("SUM(amount)"));
241 }
242 _ => panic!("expected create"),
243 }
244 }
245
246 #[test]
247 fn parse_create_missing_as_errors() {
248 let err = parse_live_table_statement("CREATE LIVE TABLE t SELECT 1").unwrap_err();
249 assert!(matches!(err, SqlError::Unsupported { .. }));
250 }
251
252 #[test]
253 fn parse_refresh_and_drop() {
254 let r = parse_live_table_statement("REFRESH LIVE TABLE orders_summary")
255 .unwrap()
256 .unwrap();
257 assert!(matches!(r, LiveTableStatement::Refresh { .. }));
258 let d = parse_live_table_statement("DROP LIVE TABLE orders_summary")
259 .unwrap()
260 .unwrap();
261 assert!(matches!(d, LiveTableStatement::Drop { .. }));
262 }
263
264 #[test]
265 fn registry_register_and_drop() {
266 let reg = LiveTableRegistry::new();
267 reg.register("v", "SELECT 1");
268 assert!(reg.contains("v").unwrap());
269 reg.remove_table("v").unwrap();
270 assert!(!reg.contains("v").unwrap());
271 }
272
273 #[test]
276 fn execute_live_table_ddl_create_populates_registry_and_returns_streaming_plan() {
277 use krishiv_plan::ExecutionKind;
278
279 let registry = LiveTableRegistry::new();
280 let plan = execute_live_table_ddl(
281 ®istry,
282 "CREATE LIVE TABLE summary AS SELECT id, SUM(val) FROM events GROUP BY id",
283 )
284 .unwrap()
285 .unwrap();
286
287 assert!(
288 registry.contains("summary").unwrap(),
289 "registry must contain the created live table"
290 );
291 assert_eq!(
292 registry.query("summary").unwrap(),
293 Some("SELECT id, SUM(val) FROM events GROUP BY id".to_string()),
294 "registry must store the backing query"
295 );
296 assert_eq!(
297 plan.kind(),
298 ExecutionKind::Streaming,
299 "CREATE LIVE TABLE must produce a Streaming logical plan"
300 );
301 }
302
303 #[test]
304 fn execute_live_table_ddl_drop_removes_from_registry() {
305 let registry = LiveTableRegistry::new();
306 execute_live_table_ddl(®istry, "CREATE LIVE TABLE to_drop AS SELECT 1 AS n").unwrap();
307 assert!(registry.contains("to_drop").unwrap());
308
309 execute_live_table_ddl(®istry, "DROP LIVE TABLE to_drop").unwrap();
310 assert!(
311 !registry.contains("to_drop").unwrap(),
312 "dropped table must be removed from registry"
313 );
314 }
315
316 #[test]
317 fn execute_live_table_ddl_refresh_returns_plan_without_error() {
318 let registry = LiveTableRegistry::new();
319 execute_live_table_ddl(®istry, "CREATE LIVE TABLE to_refresh AS SELECT 1 AS x").unwrap();
320 let plan = execute_live_table_ddl(®istry, "REFRESH LIVE TABLE to_refresh")
321 .unwrap()
322 .expect("REFRESH must return a plan");
323 assert!(
324 !plan.nodes().is_empty(),
325 "REFRESH plan must have at least one node"
326 );
327 }
328
329 #[test]
330 fn execute_live_table_ddl_non_live_table_sql_returns_none() {
331 let registry = LiveTableRegistry::new();
332 let result = execute_live_table_ddl(®istry, "SELECT 1 AS n").unwrap();
333 assert!(
334 result.is_none(),
335 "non-live-table SQL must return None from execute_live_table_ddl"
336 );
337 }
338
339 #[test]
340 fn execute_live_table_ddl_refresh_unregistered_table_errors() {
341 let registry = LiveTableRegistry::new();
342 let err = execute_live_table_ddl(®istry, "REFRESH LIVE TABLE missing")
346 .expect_err("REFRESH on an unknown table must fail");
347 match err {
348 crate::SqlError::Unsupported { feature } => {
349 assert!(
350 feature.contains("missing"),
351 "error should name the missing table; got {feature}"
352 );
353 }
354 other => panic!("expected Unsupported, got {other:?}"),
355 }
356 }
357
358 #[test]
359 fn execute_live_table_ddl_refresh_registered_table_succeeds() {
360 let registry = LiveTableRegistry::new();
361 execute_live_table_ddl(®istry, "CREATE LIVE TABLE t AS SELECT 1").unwrap();
362 let plan = execute_live_table_ddl(®istry, "REFRESH LIVE TABLE t")
364 .unwrap()
365 .unwrap();
366 assert!(!plan.nodes().is_empty());
367 }
368}