use springql::{SpringConfig, SpringPipeline, SpringSourceRowBuilder};
fn push_row_to_pipeline(pipeline: &SpringPipeline, queue_name: &str) {
let row = SpringSourceRowBuilder::default()
.add_column("ts", "2022-01-01 13:00:00.000000000".to_string())
.unwrap()
.add_column("temperature", 5.3)
.unwrap()
.build();
pipeline.push(queue_name, row).unwrap()
}
fn main() {
let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap();
pipeline
.command(
"
CREATE SOURCE STREAM source_temperature_celsius (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);
",
)
.unwrap();
pipeline
.command(
"
CREATE SINK STREAM sink_temperature_fahrenheit (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);
",
)
.unwrap();
pipeline
.command(
"
CREATE PUMP c_to_f AS
INSERT INTO sink_temperature_fahrenheit (ts, temperature)
SELECT STREAM
source_temperature_celsius.ts,
32.0 + source_temperature_celsius.temperature * 1.8
FROM source_temperature_celsius;
",
)
.unwrap();
pipeline
.command(
"
CREATE SINK WRITER queue_temperature_fahrenheit FOR sink_temperature_fahrenheit
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_sink'
);
",
)
.unwrap();
pipeline
.command(
"
CREATE SOURCE READER queue_temperature_celsius FOR source_temperature_celsius
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_src'
);
",
)
.unwrap();
for _ in 0..10 {
push_row_to_pipeline(&pipeline, "q_src");
}
while let Ok(row) = pipeline.pop("q_sink") {
let ts: String = row.get_not_null_by_index(0).unwrap();
let temperature_fahrenheit: f32 = row.get_not_null_by_index(1).unwrap();
eprintln!("{}\t{}", ts, temperature_fahrenheit);
}
}