Skip to main content

Module datafusion

Module datafusion 

Source
Expand description

DataFusion integration for SQL processing

This module provides the integration layer between LaminarDB’s push-based streaming engine and DataFusion’s pull-based SQL query execution.

§Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Ring 2: Query Planning                        │
│  SQL Query → SessionContext → LogicalPlan → ExecutionPlan       │
│                                      │                          │
│                            StreamingScanExec                    │
│                                      │                          │
│                              ┌───────▼──────┐                   │
│                              │ StreamBridge │ (tokio channel)   │
│                              └───────▲──────┘                   │
├──────────────────────────────────────┼──────────────────────────┤
│                    Ring 0: Hot Path   │                          │
│                                      │                          │
│  Source → Reactor.poll() ────────────┘                          │
│              (Events with RecordBatch data)                     │
└─────────────────────────────────────────────────────────────────┘

§Components

§Usage

use laminar_sql::datafusion::{
    create_streaming_context, ChannelStreamSource, StreamingTableProvider,
};
use std::sync::Arc;

// Create a streaming context
let ctx = create_streaming_context();

// Create a channel source
let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Int64, false),
    Field::new("value", DataType::Float64, true),
]));
let source = Arc::new(ChannelStreamSource::new(schema));
let sender = source.sender();

// Register as a table
let provider = StreamingTableProvider::new("events", source);
ctx.register_table("events", Arc::new(provider))?;

// Push data from the Reactor
sender.send(batch).await?;

// Execute SQL queries
let df = ctx.sql("SELECT * FROM events WHERE value > 100").await?;

Re-exports§

pub use aggregate_bridge::create_aggregate_factory;
pub use aggregate_bridge::lookup_aggregate_udf;
pub use aggregate_bridge::result_to_scalar_value;
pub use aggregate_bridge::scalar_value_to_result;
pub use aggregate_bridge::DataFusionAccumulatorAdapter;
pub use aggregate_bridge::DataFusionAggregateFactory;
pub use complex_type_lambda::register_lambda_functions;
pub use complex_type_lambda::ArrayFilter;
pub use complex_type_lambda::ArrayReduce;
pub use complex_type_lambda::ArrayTransform;
pub use complex_type_lambda::MapFilter;
pub use complex_type_lambda::MapTransformValues;
pub use complex_type_udf::register_complex_type_functions;
pub use complex_type_udf::MapContainsKey;
pub use complex_type_udf::MapFromArrays;
pub use complex_type_udf::MapKeys;
pub use complex_type_udf::MapValues;
pub use complex_type_udf::StructDrop;
pub use complex_type_udf::StructExtract;
pub use complex_type_udf::StructMerge;
pub use complex_type_udf::StructRename;
pub use complex_type_udf::StructSet;
pub use execute::execute_streaming_sql;
pub use execute::DdlResult;
pub use execute::QueryResult;
pub use execute::StreamingSqlResult;
pub use format_bridge_udf::FromJsonUdf;
pub use format_bridge_udf::ParseEpochUdf;
pub use format_bridge_udf::ParseTimestampUdf;
pub use format_bridge_udf::ToJsonUdf;
pub use json_extensions::register_json_extensions;
pub use json_extensions::JsonInferSchema;
pub use json_extensions::JsonToColumns;
pub use json_extensions::JsonbDeepMerge;
pub use json_extensions::JsonbExcept;
pub use json_extensions::JsonbFlatten;
pub use json_extensions::JsonbMerge;
pub use json_extensions::JsonbPick;
pub use json_extensions::JsonbRenameKeys;
pub use json_extensions::JsonbStripNulls;
pub use json_extensions::JsonbUnflatten;
pub use json_path::CompiledJsonPath;
pub use json_path::JsonPathStep;
pub use json_path::JsonbPathExistsUdf;
pub use json_path::JsonbPathMatchUdf;
pub use json_tvf::register_json_table_functions;
pub use json_tvf::JsonbArrayElementsTextTvf;
pub use json_tvf::JsonbArrayElementsTvf;
pub use json_tvf::JsonbEachTextTvf;
pub use json_tvf::JsonbEachTvf;
pub use json_tvf::JsonbObjectKeysTvf;
pub use json_udaf::JsonAgg;
pub use json_udaf::JsonObjectAgg;
pub use json_udf::JsonBuildArray;
pub use json_udf::JsonBuildObject;
pub use json_udf::JsonTypeof;
pub use json_udf::JsonbContainedBy;
pub use json_udf::JsonbContains;
pub use json_udf::JsonbExists;
pub use json_udf::JsonbExistsAll;
pub use json_udf::JsonbExistsAny;
pub use json_udf::JsonbGet;
pub use json_udf::JsonbGetIdx;
pub use json_udf::JsonbGetPath;
pub use json_udf::JsonbGetPathText;
pub use json_udf::JsonbGetText;
pub use json_udf::JsonbGetTextIdx;
pub use json_udf::ToJsonb;
pub use lookup_join_exec::LookupJoinExec;
pub use lookup_join_exec::LookupJoinExtensionPlanner;
pub use lookup_join_exec::LookupSnapshot;
pub use lookup_join_exec::LookupTableRegistry;
pub use lookup_join_exec::PartialLookupJoinExec;
pub use lookup_join_exec::PartialLookupState;
pub use lookup_join_exec::RegisteredLookup;
pub use lookup_join_exec::VersionedLookupJoinExec;
pub use lookup_join_exec::VersionedLookupState;
pub use proctime_udf::ProcTimeUdf;
pub use watermark_filter::WatermarkDynamicFilter;
pub use watermark_udf::WatermarkUdf;
pub use window_udf::CumulateWindowStart;
pub use window_udf::HopWindowStart;
pub use window_udf::SessionWindowStart;
pub use window_udf::TumbleWindowStart;

Modules§

aggregate_bridge
DataFusion aggregate bridge for streaming aggregation.
complex_type_lambda
Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3) Lambda higher-order functions for arrays and maps (F-SCHEMA-015 Tier 3).
complex_type_udf
Array, Struct, and Map scalar UDFs (F-SCHEMA-015) Array, Struct, and Map scalar UDFs (F-SCHEMA-015).
execute
End-to-end streaming SQL execution End-to-end streaming SQL execution
format_bridge_udf
Format bridge UDFs for inline format conversion Format bridge scalar UDFs (F-SCHEMA-014).
json_extensions
LaminarDB streaming JSON extension UDFs (F-SCHEMA-013) LaminarDB JSON extension UDFs (F-SCHEMA-013).
json_path
SQL/JSON path query compiler and scalar UDFs SQL/JSON path query functions (F-SCHEMA-012).
json_tvf
JSON table-valued functions (array/object expansion) JSON table-valued functions (F-SCHEMA-012).
json_types
JSONB binary format types for JSON UDF evaluation JSONB type system for JSON UDF evaluation.
json_udaf
PostgreSQL-compatible JSON aggregate UDAFs PostgreSQL-compatible JSON aggregate UDFs (F-SCHEMA-011).
json_udf
PostgreSQL-compatible JSON scalar UDFs PostgreSQL-compatible JSON scalar UDFs (F-SCHEMA-011).
lookup_join
Lookup join plan node for DataFusion. LookupJoinNode — custom DataFusion logical plan node for lookup joins.
lookup_join_exec
Physical execution plan and extension planner for lookup joins. Physical execution plan for lookup joins.
proctime_udf
Processing-time UDF for PROCTIME() support Processing-time UDF for DataFusion integration
watermark_filter
Dynamic watermark filter for scan-level late-data pruning Dynamic watermark filter for scan-level late-data pruning
watermark_udf
Watermark UDF for current watermark access Watermark UDF for DataFusion integration
window_udf
Window function UDFs (TUMBLE, HOP, SESSION, CUMULATE) Window function UDFs for DataFusion integration

Structs§

BridgeSender
A cloneable sender for pushing RecordBatch instances into a bridge.
BridgeStream
A stream that pulls RecordBatch instances from the bridge.
ChannelStreamSource
A streaming source that receives data through a channel.
SortColumn
Declares a column’s sort ordering for a streaming source.
StreamBridge
A bridge that connects push-based data producers with pull-based consumers.
StreamingScanExec
A DataFusion execution plan that scans from a streaming source.
StreamingTableProvider
A DataFusion table provider backed by a streaming source.

Enums§

BridgeSendError
Error when sending a batch to the bridge.
BridgeTrySendError
Error when trying to send a batch without blocking.

Traits§

StreamSource
A source of streaming data for DataFusion queries.

Functions§

base_session_config
Returns a base SessionConfig with identifier normalization disabled.
create_session_context
Creates a DataFusion session context with identifier normalization disabled.
create_streaming_context
Creates a DataFusion session context configured for streaming queries.
create_streaming_context_with_validator
Creates a streaming context with a configurable validator mode.
register_json_functions
Registers all PostgreSQL-compatible JSON UDFs and UDAFs with the given SessionContext.
register_streaming_functions
Registers LaminarDB streaming UDFs with a session context.
register_streaming_functions_with_watermark
Registers streaming UDFs with a live watermark source.

Type Aliases§

StreamSourceRef
A shared reference to a stream source.