1use std::collections::HashMap;
21use std::sync::RwLock;
22
23use crate::{SqlError, SqlResult};
24
25#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum PipelineStatement {
30 CreateSource { name: String, source: SourceSpec },
32 CreateSink {
34 name: String,
35 view: String,
36 connector: Option<ConnectorSpec>,
37 },
38 StartPipeline { sink: String },
40 RefreshPipeline { sink: String, full: bool },
43 DropSource { name: String },
45 DropSink { name: String },
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ConnectorSpec {
54 pub kind: String,
55 pub options: HashMap<String, String>,
56}
57
58impl ConnectorSpec {
59 pub fn require(&self, key: &str) -> SqlResult<&str> {
61 self.options
62 .get(key)
63 .map(String::as_str)
64 .ok_or_else(|| SqlError::Unsupported {
65 feature: format!("connector '{}' requires option '{key}'", self.kind),
66 })
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum SourceSpec {
74 Query(String),
76 Connector(ConnectorSpec),
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct SinkSpec {
85 pub view: String,
86 pub connector: Option<ConnectorSpec>,
87}
88
89#[derive(Debug, Default)]
91pub struct PipelineRegistry {
92 sources: RwLock<HashMap<String, SourceSpec>>,
93 sinks: RwLock<HashMap<String, SinkSpec>>,
94}
95
96fn poisoned() -> SqlError {
97 SqlError::DataFusion {
98 message: "pipeline registry lock poisoned".into(),
99 }
100}
101
102impl PipelineRegistry {
103 pub fn new() -> Self {
104 Self::default()
105 }
106
107 pub fn register_source(&self, name: impl Into<String>, spec: SourceSpec) -> SqlResult<()> {
108 self.sources
109 .write()
110 .map_err(|_| poisoned())?
111 .insert(name.into(), spec);
112 Ok(())
113 }
114
115 pub fn register_sink(&self, name: impl Into<String>, spec: SinkSpec) -> SqlResult<()> {
116 self.sinks
117 .write()
118 .map_err(|_| poisoned())?
119 .insert(name.into(), spec);
120 Ok(())
121 }
122
123 pub fn source(&self, name: &str) -> SqlResult<Option<SourceSpec>> {
124 Ok(self
125 .sources
126 .read()
127 .map_err(|_| poisoned())?
128 .get(name)
129 .cloned())
130 }
131
132 pub fn sink(&self, name: &str) -> SqlResult<Option<SinkSpec>> {
133 Ok(self
134 .sinks
135 .read()
136 .map_err(|_| poisoned())?
137 .get(name)
138 .cloned())
139 }
140
141 pub fn view_for_sink(&self, name: &str) -> SqlResult<Option<String>> {
144 Ok(self
145 .sinks
146 .read()
147 .map_err(|_| poisoned())?
148 .get(name)
149 .map(|spec| spec.view.clone()))
150 }
151
152 pub fn sink_names(&self) -> SqlResult<Vec<String>> {
154 Ok(self
155 .sinks
156 .read()
157 .map_err(|_| poisoned())?
158 .keys()
159 .cloned()
160 .collect())
161 }
162
163 pub fn sources(&self) -> SqlResult<Vec<(String, SourceSpec)>> {
165 Ok(self
166 .sources
167 .read()
168 .map_err(|_| poisoned())?
169 .iter()
170 .map(|(k, v)| (k.clone(), v.clone()))
171 .collect())
172 }
173
174 pub fn remove_source(&self, name: &str) -> SqlResult<bool> {
175 Ok(self
176 .sources
177 .write()
178 .map_err(|_| poisoned())?
179 .remove(name)
180 .is_some())
181 }
182
183 pub fn remove_sink(&self, name: &str) -> SqlResult<bool> {
184 Ok(self
185 .sinks
186 .write()
187 .map_err(|_| poisoned())?
188 .remove(name)
189 .is_some())
190 }
191}
192
193pub fn parse_pipeline_statement(sql: &str) -> SqlResult<Option<PipelineStatement>> {
197 let trimmed = sql.trim().trim_end_matches(';').trim();
198 let upper = trimmed.to_uppercase();
199
200 if upper.starts_with("CREATE SOURCE ") {
201 let rest = &trimmed["CREATE SOURCE ".len()..];
202 if let Some((name, query)) = split_keyword(rest, " AS ") {
204 require_nonempty(&name)?;
205 if query.trim().is_empty() {
206 return Err(SqlError::Unsupported {
207 feature: "CREATE SOURCE requires a query after AS".into(),
208 });
209 }
210 return Ok(Some(PipelineStatement::CreateSource {
211 name,
212 source: SourceSpec::Query(query.trim().to_string()),
213 }));
214 }
215 if let Some((name, conn)) = split_keyword(rest, " FROM ") {
216 require_nonempty(&name)?;
217 return Ok(Some(PipelineStatement::CreateSource {
218 name,
219 source: SourceSpec::Connector(parse_connector_spec(&conn)?),
220 }));
221 }
222 return Err(SqlError::Unsupported {
223 feature: "CREATE SOURCE requires '<name> AS <query>' or '<name> FROM <connector>(...)'"
224 .into(),
225 });
226 }
227
228 if upper.starts_with("CREATE SINK ") {
229 let rest = &trimmed["CREATE SINK ".len()..];
230 let (name, after_from) =
231 split_keyword(rest, " FROM ").ok_or_else(|| SqlError::Unsupported {
232 feature: "CREATE SINK requires '<name> FROM <view>'".into(),
233 })?;
234 require_nonempty(&name)?;
235 let (view, connector) = if let Some((view, conn)) = split_keyword(&after_from, " INTO ") {
237 (view, Some(parse_connector_spec(&conn)?))
238 } else {
239 (after_from.trim().to_string(), None)
240 };
241 let view = view.trim().to_string();
242 require_nonempty(&view)?;
243 return Ok(Some(PipelineStatement::CreateSink {
244 name,
245 view,
246 connector,
247 }));
248 }
249
250 if upper.starts_with("START PIPELINE ") {
251 let sink = trimmed["START PIPELINE ".len()..].trim().to_string();
252 require_nonempty(&sink)?;
253 return Ok(Some(PipelineStatement::StartPipeline { sink }));
254 }
255
256 if upper.starts_with("REFRESH PIPELINE ") {
257 let rest = trimmed["REFRESH PIPELINE ".len()..].trim();
258 let (sink, full) = match rest.to_uppercase().strip_suffix(" FULL") {
260 Some(_) => (rest[..rest.len() - " FULL".len()].trim().to_string(), true),
261 None => (rest.to_string(), false),
262 };
263 require_nonempty(&sink)?;
264 return Ok(Some(PipelineStatement::RefreshPipeline { sink, full }));
265 }
266
267 if upper.starts_with("DROP SOURCE ") {
268 let name = trimmed["DROP SOURCE ".len()..].trim().to_string();
269 require_nonempty(&name)?;
270 return Ok(Some(PipelineStatement::DropSource { name }));
271 }
272
273 if upper.starts_with("DROP SINK ") {
274 let name = trimmed["DROP SINK ".len()..].trim().to_string();
275 require_nonempty(&name)?;
276 return Ok(Some(PipelineStatement::DropSink { name }));
277 }
278
279 Ok(None)
280}
281
282pub fn execute_pipeline_ddl(registry: &PipelineRegistry, sql: &str) -> SqlResult<Option<String>> {
288 let Some(stmt) = parse_pipeline_statement(sql)? else {
289 return Ok(None);
290 };
291 match stmt {
292 PipelineStatement::CreateSource { name, source } => {
293 registry.register_source(name.clone(), source)?;
294 Ok(Some(name))
295 }
296 PipelineStatement::CreateSink {
297 name,
298 view,
299 connector,
300 } => {
301 registry.register_sink(name.clone(), SinkSpec { view, connector })?;
302 Ok(Some(name))
303 }
304 PipelineStatement::DropSource { name } => {
305 registry.remove_source(&name)?;
306 Ok(Some(name))
307 }
308 PipelineStatement::DropSink { name } => {
309 registry.remove_sink(&name)?;
310 Ok(Some(name))
311 }
312 PipelineStatement::StartPipeline { .. } | PipelineStatement::RefreshPipeline { .. } => {
314 Ok(None)
315 }
316 }
317}
318
319fn split_keyword(rest: &str, keyword: &str) -> Option<(String, String)> {
324 let upper = rest.to_uppercase();
325 let key_upper = keyword.to_uppercase();
326 let idx = upper.find(&key_upper)?;
327 let before = rest[..idx].trim().to_string();
328 let after = rest[idx + keyword.len()..].to_string();
329 Some((before, after))
330}
331
332fn require_nonempty(s: &str) -> SqlResult<()> {
333 if s.trim().is_empty() {
334 Err(SqlError::EmptyTableName)
335 } else {
336 Ok(())
337 }
338}
339
340fn parse_connector_spec(s: &str) -> SqlResult<ConnectorSpec> {
343 let s = s.trim();
344 let open = s.find('(').ok_or_else(|| SqlError::Unsupported {
345 feature: "connector spec must be '<KIND>(key='value', ...)'".into(),
346 })?;
347 let close = s.rfind(')').ok_or_else(|| SqlError::Unsupported {
348 feature: "connector spec missing closing ')'".into(),
349 })?;
350 if close < open {
351 return Err(SqlError::Unsupported {
352 feature: "connector spec has malformed parentheses".into(),
353 });
354 }
355 let kind = s[..open].trim().to_lowercase();
356 require_nonempty(&kind)?;
357
358 let mut options = HashMap::new();
359 for part in s[open + 1..close].split(',') {
360 let part = part.trim();
361 if part.is_empty() {
362 continue;
363 }
364 let (k, v) = part.split_once('=').ok_or_else(|| SqlError::Unsupported {
365 feature: format!("connector option '{part}' must be 'key=value'"),
366 })?;
367 let v = v.trim().trim_matches(['\'', '"']);
368 options.insert(k.trim().to_lowercase(), v.to_string());
369 }
370 Ok(ConnectorSpec { kind, options })
371}
372
373#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn parse_create_source_query() {
381 let s = parse_pipeline_statement("CREATE SOURCE orders AS SELECT * FROM raw").unwrap();
382 assert_eq!(
383 s,
384 Some(PipelineStatement::CreateSource {
385 name: "orders".into(),
386 source: SourceSpec::Query("SELECT * FROM raw".into()),
387 })
388 );
389 }
390
391 #[test]
392 fn parse_create_source_connector() {
393 let s =
394 parse_pipeline_statement("CREATE SOURCE orders FROM PARQUET(path='/data/o.parquet')")
395 .unwrap();
396 let Some(PipelineStatement::CreateSource {
397 name,
398 source: SourceSpec::Connector(spec),
399 }) = s
400 else {
401 panic!("expected connector source");
402 };
403 assert_eq!(name, "orders");
404 assert_eq!(spec.kind, "parquet");
405 assert_eq!(spec.require("path").unwrap(), "/data/o.parquet");
406 }
407
408 #[test]
409 fn parse_create_sink_plain_and_connector() {
410 assert_eq!(
412 parse_pipeline_statement("CREATE SINK out FROM revenue;").unwrap(),
413 Some(PipelineStatement::CreateSink {
414 name: "out".into(),
415 view: "revenue".into(),
416 connector: None,
417 })
418 );
419 let Some(PipelineStatement::CreateSink {
421 name,
422 view,
423 connector: Some(spec),
424 }) = parse_pipeline_statement(
425 "CREATE SINK out FROM revenue INTO PARQUET(path='/o.parquet')",
426 )
427 .unwrap()
428 else {
429 panic!("expected connector sink");
430 };
431 assert_eq!((name.as_str(), view.as_str()), ("out", "revenue"));
432 assert_eq!(spec.kind, "parquet");
433 assert_eq!(spec.require("path").unwrap(), "/o.parquet");
434 }
435
436 #[test]
437 fn parse_start_and_drops() {
438 assert_eq!(
439 parse_pipeline_statement("START PIPELINE out").unwrap(),
440 Some(PipelineStatement::StartPipeline { sink: "out".into() })
441 );
442 assert_eq!(
443 parse_pipeline_statement("DROP SOURCE orders").unwrap(),
444 Some(PipelineStatement::DropSource {
445 name: "orders".into()
446 })
447 );
448 assert_eq!(
449 parse_pipeline_statement("DROP SINK out").unwrap(),
450 Some(PipelineStatement::DropSink { name: "out".into() })
451 );
452 }
453
454 #[test]
455 fn parse_refresh_pipeline() {
456 assert_eq!(
457 parse_pipeline_statement("REFRESH PIPELINE out").unwrap(),
458 Some(PipelineStatement::RefreshPipeline {
459 sink: "out".into(),
460 full: false
461 })
462 );
463 assert_eq!(
464 parse_pipeline_statement("REFRESH PIPELINE out FULL;").unwrap(),
465 Some(PipelineStatement::RefreshPipeline {
466 sink: "out".into(),
467 full: true
468 })
469 );
470 }
471
472 #[test]
473 fn non_pipeline_sql_returns_none() {
474 assert_eq!(parse_pipeline_statement("SELECT 1").unwrap(), None);
475 }
476
477 #[test]
478 fn registry_create_drop_roundtrip() {
479 let reg = PipelineRegistry::new();
480 execute_pipeline_ddl(®, "CREATE SOURCE orders AS SELECT * FROM raw").unwrap();
481 execute_pipeline_ddl(®, "CREATE SINK out FROM revenue").unwrap();
482 assert_eq!(
483 reg.source("orders").unwrap().unwrap(),
484 SourceSpec::Query("SELECT * FROM raw".into())
485 );
486 assert_eq!(reg.sink("out").unwrap().unwrap().view, "revenue");
487 assert_eq!(
489 execute_pipeline_ddl(®, "START PIPELINE out").unwrap(),
490 None
491 );
492 assert!(
493 execute_pipeline_ddl(®, "DROP SOURCE orders")
494 .unwrap()
495 .is_some()
496 );
497 assert!(reg.source("orders").unwrap().is_none());
498 }
499}