Expand description
SpringQL implementation.
§High-level architecture diagram
§Examples
§Simple pipeline example
- create pipeline instance: SpringPipeline::new
- execute DDLs: SpringPipeline::command
- fetch row from pipeline: SpringPipeline::pop
use springql::{SpringPipeline, SpringConfig};
fn main() {
const SOURCE_PORT: u16 = 54300;
// create pipeline instans
let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap();
// execute DDLs for build pipeline
// source stream inputs to SpringQL pipeline
pipeline.command(
"CREATE SOURCE STREAM source_temperature_celsius (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);", ).unwrap();
// sink stream output from SpringQL pipeline
pipeline.command(
"CREATE SINK STREAM sink_temperature_fahrenheit (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);", ).unwrap();
// create pump for fetching rows from source
pipeline.command(
"CREATE PUMP c_to_f AS
INSERT INTO sink_temperature_fahrenheit (ts, temperature)
SELECT STREAM
source_temperature_celsius.ts,
32.0 + source_temperature_celsius.temperature * 1.8
FROM source_temperature_celsius;", ).unwrap();
// create sink writer, accessible by name "q", You can fetch row from `pipeline.pop("q")`
pipeline.command(
"CREATE SINK WRITER queue_temperature_fahrenheit FOR sink_temperature_fahrenheit
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q'
);", ).unwrap();
// create source reader, input come from net_server
pipeline.command(format!(
"CREATE SOURCE READER tcp_temperature_celsius FOR source_temperature_celsius
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{}'
);", SOURCE_PORT)).unwrap();
eprintln!("waiting JSON records in tcp/{} port...", SOURCE_PORT);
// fetch row from "q" , "q" is a sink writer defined above.
while let Ok(row) = pipeline.pop("q") {
// get field value from row by field index
let ts: String = row.get_not_null_by_index(0).unwrap();
let temperature_fahrenheit: f32 = row.get_not_null_by_index(1).unwrap();
// show in STDERR
eprintln!("{}\t{}", ts, temperature_fahrenheit);
}
}
Run this shell script to input data.
echo '{"ts": "2022-01-01 13:00:00.000000000", "temperature": 5.3}' | nc localhost 54300
§Using Window and share pipeline for many threads
- To share pipeline for threads, use std::sync::Arc
- non blocking fetch rom for sink pop_non_blocking
use std::{sync::Arc, thread, time::Duration};
use springql::{SpringPipeline, SpringConfig};
fn main() {
const SOURCE_PORT: u16 = 54300;
// Using Arc to share the reference between threads feeding sink rows.
let pipeline = Arc::new(SpringPipeline::new(&SpringConfig::default()).unwrap());
pipeline.command(
"CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
symbol TEXT NOT NULL,
amount INTEGER NOT NULL
);", ).unwrap();
pipeline.command(
"CREATE SINK STREAM sink_avg_all (
ts TIMESTAMP NOT NULL ROWTIME,
avg_amount FLOAT NOT NULL
);", ).unwrap();
pipeline.command(
"CREATE SINK STREAM sink_avg_by_symbol (
ts TIMESTAMP NOT NULL ROWTIME,
symbol TEXT NOT NULL,
avg_amount FLOAT NOT NULL
);", ).unwrap();
// Creates windows per 10 seconds ([:00, :10), [:10, :20), ...),
// and calculate the average amount over the rows inside each window.
//
// Second parameter `DURATION_SECS(0)` means allowed latency for late data.
//
// You can ignore here.
pipeline.command(
"CREATE PUMP avg_all AS
INSERT INTO sink_avg_all (ts, avg_amount)
SELECT STREAM
FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS min_ts,
AVG(source_trade.amount) AS avg_amount
FROM source_trade
GROUP BY min_ts
FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0);
", ).unwrap();
// Creates windows per 2 seconds ([:00, :02), [:02, :04), ...),
// and then group the rows inside each window having the same symbol.
// Calculate the average amount for each group.
pipeline.command(
"CREATE PUMP avg_by_symbol AS
INSERT INTO sink_avg_by_symbol (ts, symbol, avg_amount)
SELECT STREAM
FLOOR_TIME(source_trade.ts, DURATION_SECS(2)) AS min_ts,
source_trade.symbol AS symbol,
AVG(source_trade.amount) AS avg_amount
FROM source_trade
GROUP BY min_ts, symbol
FIXED WINDOW DURATION_SECS(2), DURATION_SECS(0);
", ).unwrap();
pipeline.command(
"CREATE SINK WRITER queue_avg_all FOR sink_avg_all
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_avg_all'
);", ).unwrap();
pipeline.command(
"CREATE SINK WRITER queue_avg_by_symbol FOR sink_avg_by_symbol
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_avg_by_symbol'
);", ).unwrap();
pipeline.command(format!(
"CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{}'
);", SOURCE_PORT)).unwrap();
eprintln!("waiting JSON records in tcp/{} port...", SOURCE_PORT);
loop {
// Fetching rows from q_avg_all.
if let Some(row) = pipeline.pop_non_blocking("q_avg_all").unwrap() {
let ts: String = row.get_not_null_by_index(0).unwrap();
let avg_amount: f32 = row.get_not_null_by_index(1).unwrap();
eprintln!("[q_avg_all] {}\t{}", ts, avg_amount);
}
// Fetching rows from q_avg_by_symbol
if let Some(row) = pipeline.pop_non_blocking("q_avg_by_symbol").unwrap() {
let ts: String = row.get_not_null_by_index(0).unwrap();
let symbol: String = row.get_not_null_by_index(1).unwrap();
let avg_amount: f32 = row.get_not_null_by_index(2).unwrap();
eprintln!("[q_avg_by_symbol] {}\t{}\t{}", ts, symbol, avg_amount);
}
// Avoid busy loop
thread::sleep(Duration::from_millis(10))
}
}
Modules§
- error
- Error type.
Structs§
- Spring
Config - Top-level config.
- Spring
Event Duration - Event-time duration.
- Spring
Memory Config - Config related to memory management.
- Spring
Pipeline - Pipeline.
- Spring
Sink Row - Row object from an in memory sink queue.
- Spring
Sink Writer Config - Config related to sink writer.
- Spring
Source Reader Config - Config related to source reader
- Spring
Source Row - Row object from an in memory sink queue.
- Spring
Source RowBuilder - Builder of
SpringSourceRow
. - Spring
Timestamp - Timestamp in UTC. Serializable.
- Spring
WebConsole Config - Config related to web console.
- Spring
Worker Config - Config related to worker threads.
Enums§
- Spring
Error - Error type
Traits§
- Spring
Value - Rust values can be unpacked from NnSqlValue back into them.
Type Aliases§
- Result
- Result type