Expand description
DataFusion integration for SQL processing
This module provides the integration layer between LaminarDB’s push-based
streaming engine and DataFusion’s pull-based SQL query execution.
§Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Ring 2: Query Planning │
│ SQL Query → SessionContext → LogicalPlan → ExecutionPlan │
│ │ │
│ StreamingScanExec │
│ │ │
│ ┌───────▼──────┐ │
│ │ StreamBridge │ (tokio channel) │
│ └───────▲──────┘ │
├──────────────────────────────────────┼──────────────────────────┤
│ Ring 0: Hot Path │ │
│ │ │
│ Source → Reactor.poll() ────────────┘ │
│ (Events with RecordBatch data) │
└─────────────────────────────────────────────────────────────────┘§Components
StreamSource: Trait for streaming data sourcesStreamBridge: Channel-based push-to-pull bridgeStreamingScanExec:DataFusionexecution plan for streaming scansStreamingTableProvider:DataFusiontable provider for streaming sourcesChannelStreamSource: Concrete source using channels
§Usage
ⓘ
use laminar_sql::datafusion::{
create_streaming_context, ChannelStreamSource, StreamingTableProvider,
};
use std::sync::Arc;
// Create a streaming context
let ctx = create_streaming_context();
// Create a channel source
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, true),
]));
let source = Arc::new(ChannelStreamSource::new(schema));
let sender = source.sender();
// Register as a table
let provider = StreamingTableProvider::new("events", source);
ctx.register_table("events", Arc::new(provider))?;
// Push data from the Reactor
sender.send(batch).await?;
// Execute SQL queries
let df = ctx.sql("SELECT * FROM events WHERE value > 100").await?;Re-exports§
pub use aggregate_bridge::create_aggregate_factory;pub use aggregate_bridge::lookup_aggregate_udf;pub use aggregate_bridge::result_to_scalar_value;pub use aggregate_bridge::scalar_value_to_result;pub use aggregate_bridge::DataFusionAccumulatorAdapter;pub use aggregate_bridge::DataFusionAggregateFactory;pub use complex_type_lambda::register_lambda_functions;pub use complex_type_lambda::ArrayFilter;pub use complex_type_lambda::ArrayReduce;pub use complex_type_lambda::ArrayTransform;pub use complex_type_lambda::MapFilter;pub use complex_type_lambda::MapTransformValues;pub use complex_type_udf::register_complex_type_functions;pub use complex_type_udf::MapContainsKey;pub use complex_type_udf::MapFromArrays;pub use complex_type_udf::MapKeys;pub use complex_type_udf::MapValues;pub use complex_type_udf::StructDrop;pub use complex_type_udf::StructExtract;pub use complex_type_udf::StructMerge;pub use complex_type_udf::StructRename;pub use complex_type_udf::StructSet;pub use execute::execute_streaming_sql;pub use execute::DdlResult;pub use execute::QueryResult;pub use execute::StreamingSqlResult;pub use format_bridge_udf::FromJsonUdf;pub use format_bridge_udf::ParseEpochUdf;pub use format_bridge_udf::ParseTimestampUdf;pub use format_bridge_udf::ToJsonUdf;pub use json_extensions::register_json_extensions;pub use json_extensions::JsonInferSchema;pub use json_extensions::JsonToColumns;pub use json_extensions::JsonbDeepMerge;pub use json_extensions::JsonbExcept;pub use json_extensions::JsonbFlatten;pub use json_extensions::JsonbMerge;pub use json_extensions::JsonbPick;pub use json_extensions::JsonbRenameKeys;pub use json_extensions::JsonbStripNulls;pub use json_extensions::JsonbUnflatten;pub use json_path::CompiledJsonPath;pub use json_path::JsonPathStep;pub use json_path::JsonbPathExistsUdf;pub use json_path::JsonbPathMatchUdf;pub use json_tvf::register_json_table_functions;pub use json_tvf::JsonbArrayElementsTextTvf;pub use json_tvf::JsonbArrayElementsTvf;pub use json_tvf::JsonbEachTextTvf;pub use json_tvf::JsonbEachTvf;pub use json_tvf::JsonbObjectKeysTvf;pub use json_udaf::JsonAgg;pub use json_udaf::JsonObjectAgg;pub use json_udf::JsonBuildArray;pub use json_udf::JsonBuildObject;pub use json_udf::JsonTypeof;pub use json_udf::JsonbContainedBy;pub use json_udf::JsonbContains;pub use json_udf::JsonbExists;pub use json_udf::JsonbExistsAll;pub use json_udf::JsonbExistsAny;pub use json_udf::JsonbGet;pub use json_udf::JsonbGetIdx;pub use json_udf::JsonbGetPath;pub use json_udf::JsonbGetPathText;pub use json_udf::JsonbGetText;pub use json_udf::JsonbGetTextIdx;pub use json_udf::ToJsonb;pub use proctime_udf::ProcTimeUdf;pub use watermark_udf::WatermarkUdf;pub use window_udf::CumulateWindowStart;pub use window_udf::HopWindowStart;pub use window_udf::SessionWindowStart;pub use window_udf::TumbleWindowStart;
Modules§
- aggregate_
bridge - DataFusion aggregate bridge for streaming aggregation.
- complex_
type_ lambda - Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3) Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3).
- complex_
type_ udf - Array, Struct, and Map scalar UDFs (F-SCHEMA-015) Array, Struct, and Map scalar UDFs (F-SCHEMA-015).
- execute
- End-to-end streaming SQL execution End-to-end streaming SQL execution
- format_
bridge_ udf - Format bridge UDFs for inline format conversion Format bridge scalar UDFs (F-SCHEMA-014).
- json_
extensions - LaminarDB streaming JSON extension UDFs (F-SCHEMA-013) LaminarDB JSON extension UDFs (F-SCHEMA-013).
- json_
path - SQL/JSON path query compiler and scalar UDFs SQL/JSON path query functions (F-SCHEMA-012).
- json_
tvf - JSON table-valued functions (array/object expansion) JSON table-valued functions (F-SCHEMA-012).
- json_
types - JSONB binary format types for JSON UDF evaluation JSONB type system for JSON UDF evaluation.
- json_
udaf - PostgreSQL-compatible JSON aggregate UDAFs PostgreSQL-compatible JSON aggregate UDFs (F-SCHEMA-011).
- json_
udf - PostgreSQL-compatible JSON scalar UDFs PostgreSQL-compatible JSON scalar UDFs (F-SCHEMA-011).
- lookup_
join - Lookup join plan node for DataFusion.
LookupJoinNode— custom DataFusion logical plan node for lookup joins. - proctime_
udf - Processing-time UDF for
PROCTIME()support Processing-time UDF forDataFusionintegration - watermark_
udf - Watermark UDF for current watermark access
Watermark UDF for
DataFusionintegration - window_
udf - Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE)
Window function UDFs for
DataFusionintegration
Structs§
- Bridge
Sender - A cloneable sender for pushing
RecordBatchinstances into a bridge. - Bridge
Stream - A stream that pulls
RecordBatchinstances from the bridge. - Channel
Stream Source - A streaming source that receives data through a channel.
- Sort
Column - Declares a column’s sort ordering for a streaming source.
- Stream
Bridge - A bridge that connects push-based data producers with pull-based consumers.
- Streaming
Scan Exec - A
DataFusionexecution plan that scans from a streaming source. - Streaming
Table Provider - A
DataFusiontable provider backed by a streaming source.
Enums§
- Bridge
Send Error - Error when sending a batch to the bridge.
- Bridge
TrySend Error - Error when trying to send a batch without blocking.
Traits§
- Stream
Source - A source of streaming data for
DataFusionqueries.
Functions§
- create_
streaming_ context - Creates a
DataFusionsession context configured for streaming queries. - register_
json_ functions - Registers all PostgreSQL-compatible JSON UDFs and UDAFs
with the given
SessionContext. - register_
streaming_ functions - Registers
LaminarDBstreaming UDFs with a session context. - register_
streaming_ functions_ with_ watermark - Registers streaming UDFs with a live watermark source.
Type Aliases§
- Stream
Source Ref - A shared reference to a stream source.