use std::collections::HashMap;
use std::sync::RwLock;
use crate::{SqlError, SqlResult};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PipelineStatement {
CreateSource { name: String, source: SourceSpec },
CreateSink {
name: String,
view: String,
connector: Option<ConnectorSpec>,
},
StartPipeline { sink: String },
RefreshPipeline { sink: String, full: bool },
DropSource { name: String },
DropSink { name: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnectorSpec {
pub kind: String,
pub options: HashMap<String, String>,
}
impl ConnectorSpec {
pub fn require(&self, key: &str) -> SqlResult<&str> {
self.options
.get(key)
.map(String::as_str)
.ok_or_else(|| SqlError::Unsupported {
feature: format!("connector '{}' requires option '{key}'", self.kind),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SourceSpec {
Query(String),
Connector(ConnectorSpec),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SinkSpec {
pub view: String,
pub connector: Option<ConnectorSpec>,
}
#[derive(Debug, Default)]
pub struct PipelineRegistry {
sources: RwLock<HashMap<String, SourceSpec>>,
sinks: RwLock<HashMap<String, SinkSpec>>,
}
fn poisoned() -> SqlError {
SqlError::DataFusion {
message: "pipeline registry lock poisoned".into(),
}
}
impl PipelineRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn register_source(&self, name: impl Into<String>, spec: SourceSpec) -> SqlResult<()> {
self.sources
.write()
.map_err(|_| poisoned())?
.insert(name.into(), spec);
Ok(())
}
pub fn register_sink(&self, name: impl Into<String>, spec: SinkSpec) -> SqlResult<()> {
self.sinks
.write()
.map_err(|_| poisoned())?
.insert(name.into(), spec);
Ok(())
}
pub fn source(&self, name: &str) -> SqlResult<Option<SourceSpec>> {
Ok(self
.sources
.read()
.map_err(|_| poisoned())?
.get(name)
.cloned())
}
pub fn sink(&self, name: &str) -> SqlResult<Option<SinkSpec>> {
Ok(self
.sinks
.read()
.map_err(|_| poisoned())?
.get(name)
.cloned())
}
pub fn view_for_sink(&self, name: &str) -> SqlResult<Option<String>> {
Ok(self
.sinks
.read()
.map_err(|_| poisoned())?
.get(name)
.map(|spec| spec.view.clone()))
}
pub fn sink_names(&self) -> SqlResult<Vec<String>> {
Ok(self
.sinks
.read()
.map_err(|_| poisoned())?
.keys()
.cloned()
.collect())
}
pub fn sources(&self) -> SqlResult<Vec<(String, SourceSpec)>> {
Ok(self
.sources
.read()
.map_err(|_| poisoned())?
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect())
}
pub fn remove_source(&self, name: &str) -> SqlResult<bool> {
Ok(self
.sources
.write()
.map_err(|_| poisoned())?
.remove(name)
.is_some())
}
pub fn remove_sink(&self, name: &str) -> SqlResult<bool> {
Ok(self
.sinks
.write()
.map_err(|_| poisoned())?
.remove(name)
.is_some())
}
}
pub fn parse_pipeline_statement(sql: &str) -> SqlResult<Option<PipelineStatement>> {
let trimmed = sql.trim().trim_end_matches(';').trim();
let upper = trimmed.to_uppercase();
if upper.starts_with("CREATE SOURCE ") {
let rest = &trimmed["CREATE SOURCE ".len()..];
if let Some((name, query)) = split_keyword(rest, " AS ") {
require_nonempty(&name)?;
if query.trim().is_empty() {
return Err(SqlError::Unsupported {
feature: "CREATE SOURCE requires a query after AS".into(),
});
}
return Ok(Some(PipelineStatement::CreateSource {
name,
source: SourceSpec::Query(query.trim().to_string()),
}));
}
if let Some((name, conn)) = split_keyword(rest, " FROM ") {
require_nonempty(&name)?;
return Ok(Some(PipelineStatement::CreateSource {
name,
source: SourceSpec::Connector(parse_connector_spec(&conn)?),
}));
}
return Err(SqlError::Unsupported {
feature: "CREATE SOURCE requires '<name> AS <query>' or '<name> FROM <connector>(...)'"
.into(),
});
}
if upper.starts_with("CREATE SINK ") {
let rest = &trimmed["CREATE SINK ".len()..];
let (name, after_from) =
split_keyword(rest, " FROM ").ok_or_else(|| SqlError::Unsupported {
feature: "CREATE SINK requires '<name> FROM <view>'".into(),
})?;
require_nonempty(&name)?;
let (view, connector) = if let Some((view, conn)) = split_keyword(&after_from, " INTO ") {
(view, Some(parse_connector_spec(&conn)?))
} else {
(after_from.trim().to_string(), None)
};
let view = view.trim().to_string();
require_nonempty(&view)?;
return Ok(Some(PipelineStatement::CreateSink {
name,
view,
connector,
}));
}
if upper.starts_with("START PIPELINE ") {
let sink = trimmed["START PIPELINE ".len()..].trim().to_string();
require_nonempty(&sink)?;
return Ok(Some(PipelineStatement::StartPipeline { sink }));
}
if upper.starts_with("REFRESH PIPELINE ") {
let rest = trimmed["REFRESH PIPELINE ".len()..].trim();
let (sink, full) = match rest.to_uppercase().strip_suffix(" FULL") {
Some(_) => (rest[..rest.len() - " FULL".len()].trim().to_string(), true),
None => (rest.to_string(), false),
};
require_nonempty(&sink)?;
return Ok(Some(PipelineStatement::RefreshPipeline { sink, full }));
}
if upper.starts_with("DROP SOURCE ") {
let name = trimmed["DROP SOURCE ".len()..].trim().to_string();
require_nonempty(&name)?;
return Ok(Some(PipelineStatement::DropSource { name }));
}
if upper.starts_with("DROP SINK ") {
let name = trimmed["DROP SINK ".len()..].trim().to_string();
require_nonempty(&name)?;
return Ok(Some(PipelineStatement::DropSink { name }));
}
Ok(None)
}
pub fn execute_pipeline_ddl(registry: &PipelineRegistry, sql: &str) -> SqlResult<Option<String>> {
let Some(stmt) = parse_pipeline_statement(sql)? else {
return Ok(None);
};
match stmt {
PipelineStatement::CreateSource { name, source } => {
registry.register_source(name.clone(), source)?;
Ok(Some(name))
}
PipelineStatement::CreateSink {
name,
view,
connector,
} => {
registry.register_sink(name.clone(), SinkSpec { view, connector })?;
Ok(Some(name))
}
PipelineStatement::DropSource { name } => {
registry.remove_source(&name)?;
Ok(Some(name))
}
PipelineStatement::DropSink { name } => {
registry.remove_sink(&name)?;
Ok(Some(name))
}
PipelineStatement::StartPipeline { .. } | PipelineStatement::RefreshPipeline { .. } => {
Ok(None)
}
}
}
fn split_keyword(rest: &str, keyword: &str) -> Option<(String, String)> {
let upper = rest.to_uppercase();
let key_upper = keyword.to_uppercase();
let idx = upper.find(&key_upper)?;
let before = rest[..idx].trim().to_string();
let after = rest[idx + keyword.len()..].to_string();
Some((before, after))
}
fn require_nonempty(s: &str) -> SqlResult<()> {
if s.trim().is_empty() {
Err(SqlError::EmptyTableName)
} else {
Ok(())
}
}
fn parse_connector_spec(s: &str) -> SqlResult<ConnectorSpec> {
let s = s.trim();
let open = s.find('(').ok_or_else(|| SqlError::Unsupported {
feature: "connector spec must be '<KIND>(key='value', ...)'".into(),
})?;
let close = s.rfind(')').ok_or_else(|| SqlError::Unsupported {
feature: "connector spec missing closing ')'".into(),
})?;
if close < open {
return Err(SqlError::Unsupported {
feature: "connector spec has malformed parentheses".into(),
});
}
let kind = s[..open].trim().to_lowercase();
require_nonempty(&kind)?;
let mut options = HashMap::new();
for part in s[open + 1..close].split(',') {
let part = part.trim();
if part.is_empty() {
continue;
}
let (k, v) = part.split_once('=').ok_or_else(|| SqlError::Unsupported {
feature: format!("connector option '{part}' must be 'key=value'"),
})?;
let v = v.trim().trim_matches(['\'', '"']);
options.insert(k.trim().to_lowercase(), v.to_string());
}
Ok(ConnectorSpec { kind, options })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_create_source_query() {
let s = parse_pipeline_statement("CREATE SOURCE orders AS SELECT * FROM raw").unwrap();
assert_eq!(
s,
Some(PipelineStatement::CreateSource {
name: "orders".into(),
source: SourceSpec::Query("SELECT * FROM raw".into()),
})
);
}
#[test]
fn parse_create_source_connector() {
let s =
parse_pipeline_statement("CREATE SOURCE orders FROM PARQUET(path='/data/o.parquet')")
.unwrap();
let Some(PipelineStatement::CreateSource {
name,
source: SourceSpec::Connector(spec),
}) = s
else {
panic!("expected connector source");
};
assert_eq!(name, "orders");
assert_eq!(spec.kind, "parquet");
assert_eq!(spec.require("path").unwrap(), "/data/o.parquet");
}
#[test]
fn parse_create_sink_plain_and_connector() {
assert_eq!(
parse_pipeline_statement("CREATE SINK out FROM revenue;").unwrap(),
Some(PipelineStatement::CreateSink {
name: "out".into(),
view: "revenue".into(),
connector: None,
})
);
let Some(PipelineStatement::CreateSink {
name,
view,
connector: Some(spec),
}) = parse_pipeline_statement(
"CREATE SINK out FROM revenue INTO PARQUET(path='/o.parquet')",
)
.unwrap()
else {
panic!("expected connector sink");
};
assert_eq!((name.as_str(), view.as_str()), ("out", "revenue"));
assert_eq!(spec.kind, "parquet");
assert_eq!(spec.require("path").unwrap(), "/o.parquet");
}
#[test]
fn parse_start_and_drops() {
assert_eq!(
parse_pipeline_statement("START PIPELINE out").unwrap(),
Some(PipelineStatement::StartPipeline { sink: "out".into() })
);
assert_eq!(
parse_pipeline_statement("DROP SOURCE orders").unwrap(),
Some(PipelineStatement::DropSource {
name: "orders".into()
})
);
assert_eq!(
parse_pipeline_statement("DROP SINK out").unwrap(),
Some(PipelineStatement::DropSink { name: "out".into() })
);
}
#[test]
fn parse_refresh_pipeline() {
assert_eq!(
parse_pipeline_statement("REFRESH PIPELINE out").unwrap(),
Some(PipelineStatement::RefreshPipeline {
sink: "out".into(),
full: false
})
);
assert_eq!(
parse_pipeline_statement("REFRESH PIPELINE out FULL;").unwrap(),
Some(PipelineStatement::RefreshPipeline {
sink: "out".into(),
full: true
})
);
}
#[test]
fn non_pipeline_sql_returns_none() {
assert_eq!(parse_pipeline_statement("SELECT 1").unwrap(), None);
}
#[test]
fn registry_create_drop_roundtrip() {
let reg = PipelineRegistry::new();
execute_pipeline_ddl(®, "CREATE SOURCE orders AS SELECT * FROM raw").unwrap();
execute_pipeline_ddl(®, "CREATE SINK out FROM revenue").unwrap();
assert_eq!(
reg.source("orders").unwrap().unwrap(),
SourceSpec::Query("SELECT * FROM raw".into())
);
assert_eq!(reg.sink("out").unwrap().unwrap().view, "revenue");
assert_eq!(
execute_pipeline_ddl(®, "START PIPELINE out").unwrap(),
None
);
assert!(
execute_pipeline_ddl(®, "DROP SOURCE orders")
.unwrap()
.is_some()
);
assert!(reg.source("orders").unwrap().is_none());
}
}