laminar-sql
SQL layer for LaminarDB with streaming extensions.
Overview
Extends standard SQL (via sqlparser-rs) with streaming constructs like tumbling windows, session windows, watermarks, EMIT clauses, and ASOF joins. Integrates with Apache DataFusion for query planning and execution.
Key Modules
| Module | Purpose |
|---|---|
parser |
Streaming SQL parser: windows, emit, late data, joins, aggregation, analytics, ranking, DDL (CREATE SOURCE/STREAM/SINK/LOOKUP TABLE) |
planner |
StreamingPlanner converts parsed SQL into StreamingPlan / QueryPlan |
translator |
Operator config builders: window, join, analytic, order, having, DDL, ASOF join |
datafusion |
DataFusion integration: custom UDFs (tumble, hop, session, slide, first_value, last_value), aggregate bridge, execute_streaming_sql |
error |
User-friendly DataFusion error translation |
Streaming SQL Extensions
-- Tumbling windows
SELECT ... FROM source GROUP BY tumble(ts, INTERVAL '1' MINUTE)
-- Sliding windows
SELECT ... FROM source GROUP BY slide(ts, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE)
-- Hopping windows
SELECT ... FROM source GROUP BY hop(ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND)
-- Session windows
SELECT ... FROM source GROUP BY session(ts, INTERVAL '30' SECOND)
-- Watermarks
CREATE SOURCE events (..., WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)
-- EMIT clause
SELECT ... EMIT ON WINDOW CLOSE
SELECT ... EMIT CHANGES
SELECT ... EMIT FINAL
-- ASOF JOIN
SELECT ... FROM orders ASOF JOIN trades ON o.symbol = t.symbol AND o.ts >= t.ts
-- Lookup tables
CREATE LOOKUP TABLE instruments FROM POSTGRES (...)
-- Late data handling
SELECT ... ALLOWED_LATENESS INTERVAL '10' SECOND
-- Window functions
SELECT ..., LAG(price, 1) OVER (PARTITION BY symbol ORDER BY ts) FROM trades
SELECT ..., ROW_NUMBER OVER (PARTITION BY symbol ORDER BY price DESC) FROM trades
-- Window frames
SELECT ..., SUM(vol) OVER (PARTITION BY sym ORDER BY ts ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
-- Connector DDL
CREATE SOURCE ... FROM KAFKA (brokers = '...', topic = '...', format = 'json')
CREATE SOURCE ... FROM POSTGRES_CDC (hostname = '...', database = '...')
CREATE SOURCE ... FROM MYSQL_CDC (hostname = '...', database = '...')
CREATE SINK ... INTO KAFKA (brokers = '...', topic = '...')
CREATE SINK ... INTO DELTA_LAKE (path = '...')
Custom UDFs Registered with DataFusion
| Function | Description |
|---|---|
tumble(ts, interval) |
Tumbling window assignment |
hop(ts, slide, size) |
Hopping/sliding window assignment |
session(ts, gap) |
Session window assignment |
slide(ts, size, slide) |
Alias for hop |
first_value(col) |
First value in window (retractable) |
last_value(col) |
Last value in window (retractable) |
Public API
use ;
// Parse streaming SQL
let statements = parse_streaming_sql?;
// Plan a query
let planner = new;
let plan = planner.plan?;
// Execute via DataFusion
let result = execute_streaming_sql.await?;
Related Crates
laminar-core-- Operator implementations that execute the planslaminar-db-- Database facade that orchestrates SQL execution