Skip to main content

krishiv_sql/
live_table.rs

1//! `CREATE LIVE TABLE` SQL extensions (R14 S1.1).
2
3use std::collections::HashMap;
4use std::sync::RwLock;
5
6use krishiv_plan::{ExecutionKind, LogicalPlan, NodeOp, PlanNode};
7
8use crate::{SqlError, SqlResult};
9
10/// Parsed live-table DDL statement.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum LiveTableStatement {
13    Create { name: String, query: String },
14    Refresh { name: String },
15    Drop { name: String },
16}
17
18/// Registry of active live tables and their backing queries.
19///
20/// Internally guarded by an `RwLock` so concurrent `query`/`contains`
21/// calls (the common case for `REFRESH LIVE TABLE` checks and executor
22/// query lookups) do not serialise against each other. Writes
23/// (`register`/`remove_table`) take the write lock. This avoids the
24/// `Mutex<LiveTableRegistry>` contention seen under fan-out of
25/// parallel `SELECT` against live tables in a shared
26/// `DataFusion` `SessionContext`.
27#[derive(Debug, Default)]
28pub struct LiveTableRegistry {
29    tables: RwLock<HashMap<String, String>>,
30}
31
32impl LiveTableRegistry {
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Returns `true` if the write lock would succeed without blocking.
38    /// Callers using `RwLock` semantics may use this to diagnose stalls.
39    pub fn try_register(
40        &self,
41        name: impl Into<String>,
42        query: impl Into<String>,
43    ) -> Result<bool, SqlError> {
44        let mut tables = self.tables.write().map_err(|_| SqlError::DataFusion {
45            message: "live table registry lock poisoned".into(),
46        })?;
47        let name = name.into();
48        let is_new = !tables.contains_key(&name);
49        tables.insert(name, query.into());
50        Ok(is_new)
51    }
52
53    pub fn register(
54        &self,
55        name: impl Into<String>,
56        query: impl Into<String>,
57    ) -> Result<(), SqlError> {
58        self.try_register(name, query).map(|_| ())
59    }
60
61    pub fn remove_table(&self, name: &str) -> SqlResult<bool> {
62        let mut tables = self.tables.write().map_err(|_| SqlError::DataFusion {
63            message: "live table registry lock poisoned".into(),
64        })?;
65        Ok(tables.remove(name).is_some())
66    }
67
68    pub fn contains(&self, name: &str) -> SqlResult<bool> {
69        let tables = self.tables.read().map_err(|_| SqlError::DataFusion {
70            message: "live table registry lock poisoned".into(),
71        })?;
72        Ok(tables.contains_key(name))
73    }
74
75    pub fn query(&self, name: &str) -> SqlResult<Option<String>> {
76        let tables = self.tables.read().map_err(|_| SqlError::DataFusion {
77            message: "live table registry lock poisoned".into(),
78        })?;
79        Ok(tables.get(name).cloned())
80    }
81}
82
83/// Parse `CREATE|REFRESH|DROP LIVE TABLE` statements.
84pub fn parse_live_table_statement(sql: &str) -> SqlResult<Option<LiveTableStatement>> {
85    let trimmed = sql.trim().trim_end_matches(';');
86    let upper = trimmed.to_uppercase();
87
88    if upper.starts_with("CREATE LIVE TABLE ") {
89        let rest =
90            trimmed
91                .get("CREATE LIVE TABLE ".len()..)
92                .ok_or_else(|| SqlError::Unsupported {
93                    feature: "CREATE LIVE TABLE".into(),
94                })?;
95        let (name, query) = split_name_and_query(rest)?;
96        return Ok(Some(LiveTableStatement::Create { name, query }));
97    }
98
99    if upper.starts_with("REFRESH LIVE TABLE ") {
100        let name = trimmed
101            .get("REFRESH LIVE TABLE ".len()..)
102            .ok_or_else(|| SqlError::Unsupported {
103                feature: "REFRESH LIVE TABLE".into(),
104            })?
105            .trim()
106            .to_string();
107        if name.is_empty() {
108            return Err(SqlError::EmptyTableName);
109        }
110        return Ok(Some(LiveTableStatement::Refresh { name }));
111    }
112
113    if upper.starts_with("DROP LIVE TABLE ") {
114        let name = trimmed
115            .get("DROP LIVE TABLE ".len()..)
116            .ok_or_else(|| SqlError::Unsupported {
117                feature: "DROP LIVE TABLE".into(),
118            })?
119            .trim()
120            .to_string();
121        if name.is_empty() {
122            return Err(SqlError::EmptyTableName);
123        }
124        return Ok(Some(LiveTableStatement::Drop { name }));
125    }
126
127    Ok(None)
128}
129
130fn split_name_and_query(rest: &str) -> SqlResult<(String, String)> {
131    let upper = rest.to_uppercase();
132    let as_pos = upper.find(" AS ").ok_or_else(|| SqlError::Unsupported {
133        feature: "CREATE LIVE TABLE requires AS <query>".into(),
134    })?;
135    let name = rest[..as_pos].trim().to_string();
136    let query = rest[as_pos + 4..].trim().to_string();
137    if name.is_empty() {
138        return Err(SqlError::EmptyTableName);
139    }
140    if query.is_empty() {
141        return Err(SqlError::EmptyQuery);
142    }
143    Ok((name, query))
144}
145
146/// Build a Krishiv logical plan for a live-table DDL statement.
147pub fn plan_live_table(stmt: LiveTableStatement) -> LogicalPlan {
148    match stmt {
149        LiveTableStatement::Create { name, query } => LogicalPlan::new(
150            format!("create-live-table:{name}"),
151            ExecutionKind::Streaming,
152        )
153        .with_node(
154            PlanNode::new(
155                format!("create-live-{name}"),
156                format!("CREATE LIVE TABLE {name}"),
157                ExecutionKind::Streaming,
158            )
159            .with_op(NodeOp::CreateLiveTable { name, query }),
160        ),
161        LiveTableStatement::Refresh { name } => LogicalPlan::new(
162            format!("refresh-live-table:{name}"),
163            ExecutionKind::Streaming,
164        )
165        .with_node(
166            PlanNode::new(
167                format!("refresh-live-{name}"),
168                format!("REFRESH LIVE TABLE {name}"),
169                ExecutionKind::Streaming,
170            )
171            .with_op(NodeOp::RefreshLiveTable { name }),
172        ),
173        LiveTableStatement::Drop { name } => {
174            LogicalPlan::new(format!("drop-live-table:{name}"), ExecutionKind::Batch).with_node(
175                PlanNode::new(
176                    format!("drop-live-{name}"),
177                    format!("DROP LIVE TABLE {name}"),
178                    ExecutionKind::Batch,
179                )
180                .with_op(NodeOp::DropLiveTable { name }),
181            )
182        }
183    }
184}
185
186/// Apply a live-table statement to the registry and return its logical plan.
187///
188/// `REFRESH LIVE TABLE <name>` looks up the existing query in the
189/// registry and re-registers it (which is the registry-level half of
190/// "refresh" — the executor is expected to re-execute the plan to
191/// materialise the new result). If the named table is not registered,
192/// the refresh is rejected with `SqlError::Unsupported` so callers see a
193/// clear error rather than a silent no-op.
194///
195/// The registry is `&LiveTableRegistry` (not `&Mutex<...>`); internal
196/// synchronisation is the registry's responsibility. Callers that
197/// already hold a `Mutex<LiveTableRegistry>` can pass `&*guard`.
198pub fn execute_live_table_ddl(
199    registry: &LiveTableRegistry,
200    sql: &str,
201) -> SqlResult<Option<LogicalPlan>> {
202    let Some(stmt) = parse_live_table_statement(sql)? else {
203        return Ok(None);
204    };
205    match &stmt {
206        LiveTableStatement::Create { name, query } => {
207            registry.register(name.clone(), query.clone())?;
208        }
209        LiveTableStatement::Drop { name } => {
210            registry.remove_table(name)?;
211        }
212        LiveTableStatement::Refresh { name } => {
213            let Some(query) = registry.query(name)? else {
214                return Err(SqlError::Unsupported {
215                    feature: format!("REFRESH LIVE TABLE {name}: table is not registered"),
216                });
217            };
218            // Re-register the same query to bump any "last refresh" bookkeeping
219            // and force the executor to re-materialise the result.
220            registry.register(name.clone(), query)?;
221        }
222    }
223    Ok(Some(plan_live_table(stmt)))
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn parse_create_live_table() {
232        let stmt = parse_live_table_statement(
233            "CREATE LIVE TABLE orders_summary AS SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id",
234        )
235        .unwrap()
236        .unwrap();
237        match stmt {
238            LiveTableStatement::Create { name, query } => {
239                assert_eq!(name, "orders_summary");
240                assert!(query.contains("SUM(amount)"));
241            }
242            _ => panic!("expected create"),
243        }
244    }
245
246    #[test]
247    fn parse_create_missing_as_errors() {
248        let err = parse_live_table_statement("CREATE LIVE TABLE t SELECT 1").unwrap_err();
249        assert!(matches!(err, SqlError::Unsupported { .. }));
250    }
251
252    #[test]
253    fn parse_refresh_and_drop() {
254        let r = parse_live_table_statement("REFRESH LIVE TABLE orders_summary")
255            .unwrap()
256            .unwrap();
257        assert!(matches!(r, LiveTableStatement::Refresh { .. }));
258        let d = parse_live_table_statement("DROP LIVE TABLE orders_summary")
259            .unwrap()
260            .unwrap();
261        assert!(matches!(d, LiveTableStatement::Drop { .. }));
262    }
263
264    #[test]
265    fn registry_register_and_drop() {
266        let reg = LiveTableRegistry::new();
267        reg.register("v", "SELECT 1");
268        assert!(reg.contains("v").unwrap());
269        reg.remove_table("v").unwrap();
270        assert!(!reg.contains("v").unwrap());
271    }
272
273    // ── execute_live_table_ddl integration ────────────────────────────────────
274
275    #[test]
276    fn execute_live_table_ddl_create_populates_registry_and_returns_streaming_plan() {
277        use krishiv_plan::ExecutionKind;
278
279        let registry = LiveTableRegistry::new();
280        let plan = execute_live_table_ddl(
281            &registry,
282            "CREATE LIVE TABLE summary AS SELECT id, SUM(val) FROM events GROUP BY id",
283        )
284        .unwrap()
285        .unwrap();
286
287        assert!(
288            registry.contains("summary").unwrap(),
289            "registry must contain the created live table"
290        );
291        assert_eq!(
292            registry.query("summary").unwrap(),
293            Some("SELECT id, SUM(val) FROM events GROUP BY id".to_string()),
294            "registry must store the backing query"
295        );
296        assert_eq!(
297            plan.kind(),
298            ExecutionKind::Streaming,
299            "CREATE LIVE TABLE must produce a Streaming logical plan"
300        );
301    }
302
303    #[test]
304    fn execute_live_table_ddl_drop_removes_from_registry() {
305        let registry = LiveTableRegistry::new();
306        execute_live_table_ddl(&registry, "CREATE LIVE TABLE to_drop AS SELECT 1 AS n").unwrap();
307        assert!(registry.contains("to_drop").unwrap());
308
309        execute_live_table_ddl(&registry, "DROP LIVE TABLE to_drop").unwrap();
310        assert!(
311            !registry.contains("to_drop").unwrap(),
312            "dropped table must be removed from registry"
313        );
314    }
315
316    #[test]
317    fn execute_live_table_ddl_refresh_returns_plan_without_error() {
318        let registry = LiveTableRegistry::new();
319        execute_live_table_ddl(&registry, "CREATE LIVE TABLE to_refresh AS SELECT 1 AS x").unwrap();
320        let plan = execute_live_table_ddl(&registry, "REFRESH LIVE TABLE to_refresh")
321            .unwrap()
322            .expect("REFRESH must return a plan");
323        assert!(
324            !plan.nodes().is_empty(),
325            "REFRESH plan must have at least one node"
326        );
327    }
328
329    #[test]
330    fn execute_live_table_ddl_non_live_table_sql_returns_none() {
331        let registry = LiveTableRegistry::new();
332        let result = execute_live_table_ddl(&registry, "SELECT 1 AS n").unwrap();
333        assert!(
334            result.is_none(),
335            "non-live-table SQL must return None from execute_live_table_ddl"
336        );
337    }
338
339    #[test]
340    fn execute_live_table_ddl_refresh_unregistered_table_errors() {
341        let registry = LiveTableRegistry::new();
342        // REFRESH on a table that has never been CREATEd must error, not
343        // silently no-op (the previous behaviour silently dropped the
344        // refresh and returned a plan).
345        let err = execute_live_table_ddl(&registry, "REFRESH LIVE TABLE missing")
346            .expect_err("REFRESH on an unknown table must fail");
347        match err {
348            crate::SqlError::Unsupported { feature } => {
349                assert!(
350                    feature.contains("missing"),
351                    "error should name the missing table; got {feature}"
352                );
353            }
354            other => panic!("expected Unsupported, got {other:?}"),
355        }
356    }
357
358    #[test]
359    fn execute_live_table_ddl_refresh_registered_table_succeeds() {
360        let registry = LiveTableRegistry::new();
361        execute_live_table_ddl(&registry, "CREATE LIVE TABLE t AS SELECT 1").unwrap();
362        // REFRESH on a registered table must succeed and return a plan.
363        let plan = execute_live_table_ddl(&registry, "REFRESH LIVE TABLE t")
364            .unwrap()
365            .unwrap();
366        assert!(!plan.nodes().is_empty());
367    }
368}