mod query_planner;
mod sql_parser;
pub use sql_parser::{
ColumnConstraintSyntax, CreatePump, DurationFunction, FromItemSyntax, GroupingElementSyntax,
OptionSyntax, ParseSuccess, PestParserImpl, SelectFieldSyntax, SelectStreamSyntax, SqlParser,
SubFromItemSyntax,
};
use crate::{
api::error::Result,
pipeline::{Pipeline, PumpModel, SinkWriterModel, SourceReaderModel, StreamModel},
sql_processor::query_planner::QueryPlanner,
stream_engine::command::{AlterPipelineCommand, Command, QueryPlan},
};
#[derive(Debug, Default)]
pub struct SqlProcessor(SqlParser);
impl SqlProcessor {
pub fn compile<S: Into<String>>(&self, sql: S, pipeline: &Pipeline) -> Result<Command> {
let command = match self.0.parse(sql)? {
ParseSuccess::CreateSourceStream(source_stream_model) => {
self.compile_create_source_stream(source_stream_model, pipeline)?
}
ParseSuccess::CreateSourceReader(source_reader_model) => {
self.compile_create_source_reader(source_reader_model, pipeline)?
}
ParseSuccess::CreateStream(stream_model) => {
self.compile_create_stream(stream_model, pipeline)?
}
ParseSuccess::CreateSinkStream(sink_stream_model) => {
self.compile_create_sink_stream(sink_stream_model, pipeline)?
}
ParseSuccess::CreateSinkWriter(sink_writer_model) => {
self.compile_create_sink_writer(sink_writer_model, pipeline)?
}
ParseSuccess::CreatePump(create_pump) => {
self.compile_create_pump(*create_pump, pipeline)?
}
};
Ok(command)
}
fn compile_create_source_stream(
&self,
source_stream_model: StreamModel,
_pipeline: &Pipeline,
) -> Result<Command> {
Ok(Command::AlterPipeline(
AlterPipelineCommand::CreateSourceStream(source_stream_model),
))
}
fn compile_create_source_reader(
&self,
source_reader_model: SourceReaderModel,
_pipeline: &Pipeline,
) -> Result<Command> {
Ok(Command::AlterPipeline(
AlterPipelineCommand::CreateSourceReader(source_reader_model),
))
}
fn compile_create_stream(
&self,
stream_model: StreamModel,
_pipeline: &Pipeline,
) -> Result<Command> {
Ok(Command::AlterPipeline(AlterPipelineCommand::CreateStream(
stream_model,
)))
}
fn compile_create_sink_stream(
&self,
sink_stream_model: StreamModel,
_pipeline: &Pipeline,
) -> Result<Command> {
Ok(Command::AlterPipeline(
AlterPipelineCommand::CreateSinkStream(sink_stream_model),
))
}
fn compile_create_sink_writer(
&self,
sink_writer_model: SinkWriterModel,
_pipeline: &Pipeline,
) -> Result<Command> {
Ok(Command::AlterPipeline(
AlterPipelineCommand::CreateSinkWriter(sink_writer_model),
))
}
fn compile_create_pump(&self, create_pump: CreatePump, pipeline: &Pipeline) -> Result<Command> {
let query_plan = self.compile_select_stream(create_pump.select_stream_syntax, pipeline)?;
let pump = PumpModel::new(create_pump.pump_name, query_plan, create_pump.insert_plan);
Ok(Command::AlterPipeline(AlterPipelineCommand::CreatePump(
Box::new(pump),
)))
}
fn compile_select_stream(
&self,
select_stream_syntax: SelectStreamSyntax,
pipeline: &Pipeline,
) -> Result<QueryPlan> {
let planner = QueryPlanner::new(select_stream_syntax);
planner.plan(pipeline)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
pipeline::{
OptionsBuilder, PipelineVersion, SinkWriterModel, SinkWriterName, SinkWriterType,
SourceReaderModel, SourceReaderName, SourceReaderType, StreamModel, StreamName,
StreamShape,
},
stream_engine::command::AlterPipelineCommand,
};
use pretty_assertions::assert_eq;
#[test]
fn test_create_source_stream() {
let processor = SqlProcessor::default();
let pipeline = Pipeline::new(PipelineVersion::new());
let sql = "
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
";
let command = processor.compile(sql, &pipeline).unwrap();
let expected_shape = StreamShape::fx_trade();
let expected_stream =
StreamModel::new(StreamName::new("source_trade".to_string()), expected_shape);
assert_eq!(
command,
Command::AlterPipeline(AlterPipelineCommand::CreateSourceStream(expected_stream))
);
}
#[test]
fn test_create_source_reader() {
let processor = SqlProcessor::default();
let pipeline = Pipeline::fx_source_only();
let sql = "
CREATE SOURCE READER tcp_source FOR st_1
TYPE NET_CLIENT OPTIONS (
REMOTE_PORT '17890'
);
";
let command = processor.compile(sql, &pipeline).unwrap();
let expected_name = SourceReaderName::new("tcp_source".to_string());
let expected_options = OptionsBuilder::default()
.add("REMOTE_PORT", "17890")
.build();
let expected_dest_source_stream = StreamName::new("st_1".to_string());
let expected_source = SourceReaderModel::new(
expected_name,
SourceReaderType::NetClient,
expected_dest_source_stream,
expected_options,
);
assert_eq!(
command,
Command::AlterPipeline(AlterPipelineCommand::CreateSourceReader(expected_source))
);
}
#[test]
fn test_create_sink_stream() {
let processor = SqlProcessor::default();
let pipeline = Pipeline::new(PipelineVersion::new());
let sql = "
CREATE SINK STREAM sink_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
";
let command = processor.compile(sql, &pipeline).unwrap();
let expected_shape = StreamShape::fx_trade();
let expected_stream =
StreamModel::new(StreamName::new("sink_trade".to_string()), expected_shape);
assert_eq!(
command,
Command::AlterPipeline(AlterPipelineCommand::CreateSinkStream(expected_stream))
);
}
#[test]
fn test_create_sink_writer() {
let processor = SqlProcessor::default();
let pipeline = Pipeline::fx_sink_only();
let sql = "
CREATE SINK WRITER tcp_sink_trade FOR sink_1
TYPE NET_CLIENT OPTIONS (
REMOTE_PORT '17890'
);
";
let command = processor.compile(sql, &pipeline).unwrap();
let expected_options = OptionsBuilder::default()
.add("REMOTE_PORT", "17890")
.build();
let expected_sink = SinkWriterModel::new(
SinkWriterName::new("tcp_sink_trade".to_string()),
SinkWriterType::Net,
StreamName::new("sink_1".to_string()),
expected_options,
);
assert_eq!(
command,
Command::AlterPipeline(AlterPipelineCommand::CreateSinkWriter(expected_sink))
);
}
}