Skip to main content

krishiv_sql/
pipeline_ddl.rs

1//! Declarative pipeline DDL: `CREATE SOURCE` / `CREATE SINK` / `START PIPELINE`.
2//!
3//! This is the SQL surface for the Tier-2 pipeline layer, following the same
4//! lightweight prefix-match approach as [`crate::incremental_view`]. It is a
5//! **metadata layer only**: it parses and registers source/sink declarations.
6//! Execution of `START PIPELINE` happens in `krishiv-api`, which resolves the
7//! registered specs against a [`Session`](../../krishiv_api/struct.Session.html)
8//! and compiles them to `session.pipeline()…run()`.
9//!
10//! # Grammar
11//!
12//! ```sql
13//! CREATE SOURCE orders AS SELECT * FROM orders_raw;          -- bounded query source
14//! CREATE INCREMENTAL VIEW revenue AS SELECT ... FROM orders; -- transform (see incremental_view)
15//! CREATE SINK revenue_out FROM revenue;                      -- collect view output
16//! START PIPELINE revenue_out;                                -- run; returns sink output
17//! DROP SOURCE orders;  DROP SINK revenue_out;
18//! ```
19
20use std::collections::HashMap;
21use std::sync::RwLock;
22
23use crate::{SqlError, SqlResult};
24
25// ── Parsed statement ────────────────────────────────────────────────────────
26
27/// A parsed pipeline DDL statement.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum PipelineStatement {
30    /// `CREATE SOURCE <name> AS <query>` or `... FROM <CONNECTOR>(...)`.
31    CreateSource { name: String, source: SourceSpec },
32    /// `CREATE SINK <name> FROM <view> [INTO <CONNECTOR>(...)]`.
33    CreateSink {
34        name: String,
35        view: String,
36        connector: Option<ConnectorSpec>,
37    },
38    /// `START PIPELINE <sink_name>` — run the pipeline feeding `sink_name`.
39    StartPipeline { sink: String },
40    /// `REFRESH PIPELINE <sink_name> [FULL]` — re-run a pipeline; `full` resets
41    /// its persisted state first (Spark SDP `--full-refresh`).
42    RefreshPipeline { sink: String, full: bool },
43    /// `DROP SOURCE <name>`.
44    DropSource { name: String },
45    /// `DROP SINK <name>`.
46    DropSink { name: String },
47}
48
49// ── Registry ────────────────────────────────────────────────────────────────
50
51/// A connector reference: a kind (`parquet`, `kafka`, …) plus key/value options.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ConnectorSpec {
54    pub kind: String,
55    pub options: HashMap<String, String>,
56}
57
58impl ConnectorSpec {
59    /// Fetch a required option, or a descriptive error if it is missing.
60    pub fn require(&self, key: &str) -> SqlResult<&str> {
61        self.options
62            .get(key)
63            .map(String::as_str)
64            .ok_or_else(|| SqlError::Unsupported {
65                feature: format!("connector '{}' requires option '{key}'", self.kind),
66            })
67    }
68}
69
70/// A declared source: either a bounded SQL query (fed as insertions) or a
71/// connector (e.g. `PARQUET(path='…')`).
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum SourceSpec {
74    /// `AS <query>` — rows from a bounded query.
75    Query(String),
76    /// `FROM <CONNECTOR>(...)` — rows pulled from a connector.
77    Connector(ConnectorSpec),
78}
79
80/// A declared sink: which view's output it collects, and optionally where it is
81/// written (a connector). With no connector, the output is returned as a result
82/// set by `START PIPELINE`.
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct SinkSpec {
85    pub view: String,
86    pub connector: Option<ConnectorSpec>,
87}
88
89/// SQL-layer registry of declared pipeline sources and sinks (metadata only).
90#[derive(Debug, Default)]
91pub struct PipelineRegistry {
92    sources: RwLock<HashMap<String, SourceSpec>>,
93    sinks: RwLock<HashMap<String, SinkSpec>>,
94}
95
96fn poisoned() -> SqlError {
97    SqlError::DataFusion {
98        message: "pipeline registry lock poisoned".into(),
99    }
100}
101
102impl PipelineRegistry {
103    pub fn new() -> Self {
104        Self::default()
105    }
106
107    pub fn register_source(&self, name: impl Into<String>, spec: SourceSpec) -> SqlResult<()> {
108        self.sources
109            .write()
110            .map_err(|_| poisoned())?
111            .insert(name.into(), spec);
112        Ok(())
113    }
114
115    pub fn register_sink(&self, name: impl Into<String>, spec: SinkSpec) -> SqlResult<()> {
116        self.sinks
117            .write()
118            .map_err(|_| poisoned())?
119            .insert(name.into(), spec);
120        Ok(())
121    }
122
123    pub fn source(&self, name: &str) -> SqlResult<Option<SourceSpec>> {
124        Ok(self
125            .sources
126            .read()
127            .map_err(|_| poisoned())?
128            .get(name)
129            .cloned())
130    }
131
132    pub fn sink(&self, name: &str) -> SqlResult<Option<SinkSpec>> {
133        Ok(self
134            .sinks
135            .read()
136            .map_err(|_| poisoned())?
137            .get(name)
138            .cloned())
139    }
140
141    /// View name backing the given sink, if the sink is registered.
142    /// Returns `None` for unknown sinks.
143    pub fn view_for_sink(&self, name: &str) -> SqlResult<Option<String>> {
144        Ok(self
145            .sinks
146            .read()
147            .map_err(|_| poisoned())?
148            .get(name)
149            .map(|spec| spec.view.clone()))
150    }
151
152    /// Names of all declared sinks.
153    pub fn sink_names(&self) -> SqlResult<Vec<String>> {
154        Ok(self
155            .sinks
156            .read()
157            .map_err(|_| poisoned())?
158            .keys()
159            .cloned()
160            .collect())
161    }
162
163    /// All declared source specs `(name, spec)`.
164    pub fn sources(&self) -> SqlResult<Vec<(String, SourceSpec)>> {
165        Ok(self
166            .sources
167            .read()
168            .map_err(|_| poisoned())?
169            .iter()
170            .map(|(k, v)| (k.clone(), v.clone()))
171            .collect())
172    }
173
174    pub fn remove_source(&self, name: &str) -> SqlResult<bool> {
175        Ok(self
176            .sources
177            .write()
178            .map_err(|_| poisoned())?
179            .remove(name)
180            .is_some())
181    }
182
183    pub fn remove_sink(&self, name: &str) -> SqlResult<bool> {
184        Ok(self
185            .sinks
186            .write()
187            .map_err(|_| poisoned())?
188            .remove(name)
189            .is_some())
190    }
191}
192
193// ── Parser ──────────────────────────────────────────────────────────────────
194
195/// Parse a pipeline DDL statement, or `Ok(None)` if it is not one.
196pub fn parse_pipeline_statement(sql: &str) -> SqlResult<Option<PipelineStatement>> {
197    let trimmed = sql.trim().trim_end_matches(';').trim();
198    let upper = trimmed.to_uppercase();
199
200    if upper.starts_with("CREATE SOURCE ") {
201        let rest = &trimmed["CREATE SOURCE ".len()..];
202        // `... AS <query>` (query source) or `... FROM <CONNECTOR>(...)`.
203        if let Some((name, query)) = split_keyword(rest, " AS ") {
204            require_nonempty(&name)?;
205            if query.trim().is_empty() {
206                return Err(SqlError::Unsupported {
207                    feature: "CREATE SOURCE requires a query after AS".into(),
208                });
209            }
210            return Ok(Some(PipelineStatement::CreateSource {
211                name,
212                source: SourceSpec::Query(query.trim().to_string()),
213            }));
214        }
215        if let Some((name, conn)) = split_keyword(rest, " FROM ") {
216            require_nonempty(&name)?;
217            return Ok(Some(PipelineStatement::CreateSource {
218                name,
219                source: SourceSpec::Connector(parse_connector_spec(&conn)?),
220            }));
221        }
222        return Err(SqlError::Unsupported {
223            feature: "CREATE SOURCE requires '<name> AS <query>' or '<name> FROM <connector>(...)'"
224                .into(),
225        });
226    }
227
228    if upper.starts_with("CREATE SINK ") {
229        let rest = &trimmed["CREATE SINK ".len()..];
230        let (name, after_from) =
231            split_keyword(rest, " FROM ").ok_or_else(|| SqlError::Unsupported {
232                feature: "CREATE SINK requires '<name> FROM <view>'".into(),
233            })?;
234        require_nonempty(&name)?;
235        // Optional `INTO <CONNECTOR>(...)` after the view.
236        let (view, connector) = if let Some((view, conn)) = split_keyword(&after_from, " INTO ") {
237            (view, Some(parse_connector_spec(&conn)?))
238        } else {
239            (after_from.trim().to_string(), None)
240        };
241        let view = view.trim().to_string();
242        require_nonempty(&view)?;
243        return Ok(Some(PipelineStatement::CreateSink {
244            name,
245            view,
246            connector,
247        }));
248    }
249
250    if upper.starts_with("START PIPELINE ") {
251        let sink = trimmed["START PIPELINE ".len()..].trim().to_string();
252        require_nonempty(&sink)?;
253        return Ok(Some(PipelineStatement::StartPipeline { sink }));
254    }
255
256    if upper.starts_with("REFRESH PIPELINE ") {
257        let rest = trimmed["REFRESH PIPELINE ".len()..].trim();
258        // Optional trailing FULL keyword.
259        let (sink, full) = match rest.to_uppercase().strip_suffix(" FULL") {
260            Some(_) => (rest[..rest.len() - " FULL".len()].trim().to_string(), true),
261            None => (rest.to_string(), false),
262        };
263        require_nonempty(&sink)?;
264        return Ok(Some(PipelineStatement::RefreshPipeline { sink, full }));
265    }
266
267    if upper.starts_with("DROP SOURCE ") {
268        let name = trimmed["DROP SOURCE ".len()..].trim().to_string();
269        require_nonempty(&name)?;
270        return Ok(Some(PipelineStatement::DropSource { name }));
271    }
272
273    if upper.starts_with("DROP SINK ") {
274        let name = trimmed["DROP SINK ".len()..].trim().to_string();
275        require_nonempty(&name)?;
276        return Ok(Some(PipelineStatement::DropSink { name }));
277    }
278
279    Ok(None)
280}
281
282/// Apply a CREATE/DROP pipeline DDL to the registry. `START PIPELINE` is **not**
283/// handled here (it needs the `krishiv-api` execution layer); it returns
284/// `Ok(None)` so the caller can intercept it.
285///
286/// Returns `Some(name)` if a CREATE/DROP statement was applied.
287pub fn execute_pipeline_ddl(registry: &PipelineRegistry, sql: &str) -> SqlResult<Option<String>> {
288    let Some(stmt) = parse_pipeline_statement(sql)? else {
289        return Ok(None);
290    };
291    match stmt {
292        PipelineStatement::CreateSource { name, source } => {
293            registry.register_source(name.clone(), source)?;
294            Ok(Some(name))
295        }
296        PipelineStatement::CreateSink {
297            name,
298            view,
299            connector,
300        } => {
301            registry.register_sink(name.clone(), SinkSpec { view, connector })?;
302            Ok(Some(name))
303        }
304        PipelineStatement::DropSource { name } => {
305            registry.remove_source(&name)?;
306            Ok(Some(name))
307        }
308        PipelineStatement::DropSink { name } => {
309            registry.remove_sink(&name)?;
310            Ok(Some(name))
311        }
312        // START / REFRESH PIPELINE are handled by the api layer, not the registry.
313        PipelineStatement::StartPipeline { .. } | PipelineStatement::RefreshPipeline { .. } => {
314            Ok(None)
315        }
316    }
317}
318
319// ── helpers ─────────────────────────────────────────────────────────────────
320
321/// Split `rest` on the first case-insensitive occurrence of `keyword`
322/// (e.g. " AS "), returning `(before_trimmed, after)`.
323fn split_keyword(rest: &str, keyword: &str) -> Option<(String, String)> {
324    let upper = rest.to_uppercase();
325    let key_upper = keyword.to_uppercase();
326    let idx = upper.find(&key_upper)?;
327    let before = rest[..idx].trim().to_string();
328    let after = rest[idx + keyword.len()..].to_string();
329    Some((before, after))
330}
331
332fn require_nonempty(s: &str) -> SqlResult<()> {
333    if s.trim().is_empty() {
334        Err(SqlError::EmptyTableName)
335    } else {
336        Ok(())
337    }
338}
339
340/// Parse a connector reference of the form `KIND(key='value', key2='value2')`.
341/// Keys are lowercased; values are unquoted (single or double quotes).
342fn parse_connector_spec(s: &str) -> SqlResult<ConnectorSpec> {
343    let s = s.trim();
344    let open = s.find('(').ok_or_else(|| SqlError::Unsupported {
345        feature: "connector spec must be '<KIND>(key='value', ...)'".into(),
346    })?;
347    let close = s.rfind(')').ok_or_else(|| SqlError::Unsupported {
348        feature: "connector spec missing closing ')'".into(),
349    })?;
350    if close < open {
351        return Err(SqlError::Unsupported {
352            feature: "connector spec has malformed parentheses".into(),
353        });
354    }
355    let kind = s[..open].trim().to_lowercase();
356    require_nonempty(&kind)?;
357
358    let mut options = HashMap::new();
359    for part in s[open + 1..close].split(',') {
360        let part = part.trim();
361        if part.is_empty() {
362            continue;
363        }
364        let (k, v) = part.split_once('=').ok_or_else(|| SqlError::Unsupported {
365            feature: format!("connector option '{part}' must be 'key=value'"),
366        })?;
367        let v = v.trim().trim_matches(['\'', '"']);
368        options.insert(k.trim().to_lowercase(), v.to_string());
369    }
370    Ok(ConnectorSpec { kind, options })
371}
372
373// ── tests ─────────────────────────────────────────────────────────────────────
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn parse_create_source_query() {
381        let s = parse_pipeline_statement("CREATE SOURCE orders AS SELECT * FROM raw").unwrap();
382        assert_eq!(
383            s,
384            Some(PipelineStatement::CreateSource {
385                name: "orders".into(),
386                source: SourceSpec::Query("SELECT * FROM raw".into()),
387            })
388        );
389    }
390
391    #[test]
392    fn parse_create_source_connector() {
393        let s =
394            parse_pipeline_statement("CREATE SOURCE orders FROM PARQUET(path='/data/o.parquet')")
395                .unwrap();
396        let Some(PipelineStatement::CreateSource {
397            name,
398            source: SourceSpec::Connector(spec),
399        }) = s
400        else {
401            panic!("expected connector source");
402        };
403        assert_eq!(name, "orders");
404        assert_eq!(spec.kind, "parquet");
405        assert_eq!(spec.require("path").unwrap(), "/data/o.parquet");
406    }
407
408    #[test]
409    fn parse_create_sink_plain_and_connector() {
410        // Plain (memory result) sink.
411        assert_eq!(
412            parse_pipeline_statement("CREATE SINK out FROM revenue;").unwrap(),
413            Some(PipelineStatement::CreateSink {
414                name: "out".into(),
415                view: "revenue".into(),
416                connector: None,
417            })
418        );
419        // Connector sink.
420        let Some(PipelineStatement::CreateSink {
421            name,
422            view,
423            connector: Some(spec),
424        }) = parse_pipeline_statement(
425            "CREATE SINK out FROM revenue INTO PARQUET(path='/o.parquet')",
426        )
427        .unwrap()
428        else {
429            panic!("expected connector sink");
430        };
431        assert_eq!((name.as_str(), view.as_str()), ("out", "revenue"));
432        assert_eq!(spec.kind, "parquet");
433        assert_eq!(spec.require("path").unwrap(), "/o.parquet");
434    }
435
436    #[test]
437    fn parse_start_and_drops() {
438        assert_eq!(
439            parse_pipeline_statement("START PIPELINE out").unwrap(),
440            Some(PipelineStatement::StartPipeline { sink: "out".into() })
441        );
442        assert_eq!(
443            parse_pipeline_statement("DROP SOURCE orders").unwrap(),
444            Some(PipelineStatement::DropSource {
445                name: "orders".into()
446            })
447        );
448        assert_eq!(
449            parse_pipeline_statement("DROP SINK out").unwrap(),
450            Some(PipelineStatement::DropSink { name: "out".into() })
451        );
452    }
453
454    #[test]
455    fn parse_refresh_pipeline() {
456        assert_eq!(
457            parse_pipeline_statement("REFRESH PIPELINE out").unwrap(),
458            Some(PipelineStatement::RefreshPipeline {
459                sink: "out".into(),
460                full: false
461            })
462        );
463        assert_eq!(
464            parse_pipeline_statement("REFRESH PIPELINE out FULL;").unwrap(),
465            Some(PipelineStatement::RefreshPipeline {
466                sink: "out".into(),
467                full: true
468            })
469        );
470    }
471
472    #[test]
473    fn non_pipeline_sql_returns_none() {
474        assert_eq!(parse_pipeline_statement("SELECT 1").unwrap(), None);
475    }
476
477    #[test]
478    fn registry_create_drop_roundtrip() {
479        let reg = PipelineRegistry::new();
480        execute_pipeline_ddl(&reg, "CREATE SOURCE orders AS SELECT * FROM raw").unwrap();
481        execute_pipeline_ddl(&reg, "CREATE SINK out FROM revenue").unwrap();
482        assert_eq!(
483            reg.source("orders").unwrap().unwrap(),
484            SourceSpec::Query("SELECT * FROM raw".into())
485        );
486        assert_eq!(reg.sink("out").unwrap().unwrap().view, "revenue");
487        // START PIPELINE is not consumed by the registry layer.
488        assert_eq!(
489            execute_pipeline_ddl(&reg, "START PIPELINE out").unwrap(),
490            None
491        );
492        assert!(
493            execute_pipeline_ddl(&reg, "DROP SOURCE orders")
494                .unwrap()
495                .is_some()
496        );
497        assert!(reg.source("orders").unwrap().is_none());
498    }
499}