pub struct SqlEngine { /* private fields */ }Implementations§
Source§impl SqlEngine
impl SqlEngine
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a local SQL engine.
Window helper UDFs (tumble_start, tumble_end, hop_start, hop_end)
are registered as part of construction. If registration fails the
engine is still returned — non-window queries work — and a
tracing::warn! is emitted. Use SqlEngine::try_new when callers
need to surface the registration error.
DataFusion target_partitions defaults to 1 (single-threaded local
execution). Use SqlEngine::with_target_parallelism to override.
Sourcepub fn new_with_memory_limit(memory_limit_bytes: Option<usize>) -> Self
pub fn new_with_memory_limit(memory_limit_bytes: Option<usize>) -> Self
Create a local SQL engine whose DataFusion execution memory is capped
at memory_limit_bytes.
When Some, the engine runs with a FairSpillPool of that size plus
the default disk manager, so memory-intensive operators (sort, hash
join, aggregation) spill to disk under pressure and queries that cannot
spill fail with a resources-exhausted error instead of exhausting
process memory. None keeps DataFusion’s default unbounded pool.
Shares SqlEngine::new’s fallback behavior for window helper UDF
registration failures.
Sourcepub fn try_new() -> SqlResult<Self>
pub fn try_new() -> SqlResult<Self>
Create a local SQL engine, propagating window helper registration errors.
Callers that need to abort startup when window functions cannot be registered should use this constructor.
Sourcepub fn with_in_memory_catalog(
catalog: Arc<RwLock<InMemoryCatalog>>,
) -> SqlResult<Self>
pub fn with_in_memory_catalog( catalog: Arc<RwLock<InMemoryCatalog>>, ) -> SqlResult<Self>
Create an engine whose krishiv catalog resolves tables registered in InMemoryCatalog (P0-10).
Sourcepub fn with_target_parallelism(self, n: NonZeroUsize) -> Self
pub fn with_target_parallelism(self, n: NonZeroUsize) -> Self
Set the DataFusion target_partitions parallelism level for this engine.
Higher values allow DataFusion to parallelise hash-join build,
aggregation spilling, and parquet scans across more threads.
Default: 1 (single-threaded). Recommended: available_parallelism().
Sourcepub fn target_parallelism(&self) -> NonZeroUsize
pub fn target_parallelism(&self) -> NonZeroUsize
Return the configured target_partitions parallelism level.
Sourcepub fn memory_limit_bytes(&self) -> Option<usize>
pub fn memory_limit_bytes(&self) -> Option<usize>
Return the DataFusion memory pool limit for this engine, if bounded.
Sourcepub fn shuffle_partitions(&self) -> Option<u32>
pub fn shuffle_partitions(&self) -> Option<u32>
Return the current shuffle.partitions override, if set via SET shuffle.partitions = N.
Sourcepub fn table_row_counts(&self) -> Arc<RwLock<HashMap<String, u64>>> ⓘ
pub fn table_row_counts(&self) -> Arc<RwLock<HashMap<String, u64>>> ⓘ
Return access to the table row-count registry.
Populated by register_parquet and register_record_batches with
estimated row counts extracted from table-provider statistics. Used
by SqlDataFrame::krishiv_logical_plan to annotate scan nodes.
Sourcepub fn registered_table_names(&self) -> Vec<String>
pub fn registered_table_names(&self) -> Vec<String>
Return table/view names registered in the live DataFusion catalog.
Uses DataFusion’s catalog provider API directly instead of routing
through SHOW TABLES, which requires optional information-schema
support in some DataFusion configurations.
Sourcepub fn with_shuffle_partitions(self, n: Option<u32>) -> Self
pub fn with_shuffle_partitions(self, n: Option<u32>) -> Self
Set an override for the shuffle partition count.
When n is Some, exchange and shuffle-write operations use n buckets
instead of auto-sizing. Pass None to restore auto-sizing.
Sourcepub fn register_streaming_table(
&self,
name: &str,
schema: SchemaRef,
) -> SqlResult<Arc<ContinuousTableInput>>
pub fn register_streaming_table( &self, name: &str, schema: SchemaRef, ) -> SqlResult<Arc<ContinuousTableInput>>
Register an unbounded continuous table, returning its typed input.
The returned input uses a bounded channel with capacity
crate::streaming::CONTINUOUS_TABLE_CHANNEL_CAPACITY. When the
consumer (the DataFusion query plan) is slower than the producer,
ContinuousTableInput::send(...).await backpressures the producer,
and ContinuousTableInput::try_send(...) returns a resource error
rather than growing memory without limit. Use
[register_streaming_table_with_capacity] for a non-default
capacity.
Sourcepub fn register_streaming_table_with_capacity(
&self,
name: &str,
schema: SchemaRef,
capacity: usize,
) -> SqlResult<Arc<ContinuousTableInput>>
pub fn register_streaming_table_with_capacity( &self, name: &str, schema: SchemaRef, capacity: usize, ) -> SqlResult<Arc<ContinuousTableInput>>
Same as Self::register_streaming_table but with a caller-supplied
channel capacity. Useful for tests that want to exercise the
full/empty channel boundary without pushing
CONTINUOUS_TABLE_CHANNEL_CAPACITY (64) batches.
Sourcepub fn register_kafka_source(
&self,
table_name: impl AsRef<str>,
schema: SchemaRef,
bootstrap_servers: impl Into<String>,
topic: impl Into<String>,
group_id: impl Into<String>,
) -> SqlResult<()>
pub fn register_kafka_source( &self, table_name: impl AsRef<str>, schema: SchemaRef, bootstrap_servers: impl Into<String>, topic: impl Into<String>, group_id: impl Into<String>, ) -> SqlResult<()>
Register a live Kafka/Redpanda topic as an unbounded streaming table.
This is the native Rust path — no Python bridge or external process
required. Under the hood it creates an rdkafka consumer and wraps it
in a DataFusion StreamingTable so normal SQL queries (SELECT,
GROUP BY, windowed aggregations) work against the live topic.
Equivalent SQL DDL:
CREATE EXTERNAL TABLE <name> (<cols>) STORED AS KAFKA
LOCATION '<topic>'
OPTIONS ('bootstrap.servers' = '…', 'group.id' = '…');Sourcepub async fn sql_to_kafka(
&self,
sql: impl AsRef<str>,
bootstrap_servers: impl Into<String>,
topic: impl Into<String>,
) -> SqlResult<u64>
pub async fn sql_to_kafka( &self, sql: impl AsRef<str>, bootstrap_servers: impl Into<String>, topic: impl Into<String>, ) -> SqlResult<u64>
Execute a SQL query and write every result row to a Kafka/Redpanda topic.
Each row is serialised as a JSON object using the same format as
[KafkaSink]. The method blocks until the query stream ends and the
producer queue is flushed, then returns the total number of rows written.
Note: If sql targets an unbounded streaming table (e.g. one
registered via [register_kafka_source]) this call will never return.
Use it with batch sources or add a LIMIT clause.
Sourcepub fn with_udf_limits(self, limits: ResourceLimits) -> Self
pub fn with_udf_limits(self, limits: ResourceLimits) -> Self
Configure this engine with explicit UDF resource limits (Track E).
When set, calls to sql() and direct UDF syncs will use these budgets
instead of unlimited defaults. Intended for job-specific engines.
Sourcepub fn is_streaming_source(&self, table_name: &str) -> bool
pub fn is_streaming_source(&self, table_name: &str) -> bool
Returns true if table_name is registered as an unbounded streaming source.
Sourcepub fn register_streaming_source_name(
&self,
table_name: impl Into<String>,
) -> SqlResult<()>
pub fn register_streaming_source_name( &self, table_name: impl Into<String>, ) -> SqlResult<()>
Register a table name as a streaming source without creating a live connector.
This is the test-safe alternative to [register_kafka_source]: it marks
table_name in the streaming_sources set so that is_streaming_query
returns true for queries that reference it, without constructing any
broker connection. Useful for unit tests where a live Kafka broker is not
available and rdkafka’s log subsystem is not initialised.
Returns SqlError::EmptyTableName if table_name is blank.
Sourcepub fn deregister_streaming_source(&self, name: &str) -> SqlResult<()>
pub fn deregister_streaming_source(&self, name: &str) -> SqlResult<()>
Remove a streaming source registration.
Deregisters the table from DataFusion and removes it from the streaming- sources set. Invalidates the plan cache. Idempotent — deregistering a name that was never registered is not an error.
Sourcepub fn live_table_registry(&self) -> &Arc<LiveTableRegistry> ⓘ
pub fn live_table_registry(&self) -> &Arc<LiveTableRegistry> ⓘ
Shared live-table registry for CREATE LIVE TABLE DDL.
Sourcepub fn incremental_view_registry(&self) -> &Arc<IncrementalViewRegistry> ⓘ
pub fn incremental_view_registry(&self) -> &Arc<IncrementalViewRegistry> ⓘ
Shared incremental-view registry for CREATE INCREMENTAL VIEW DDL.
Sourcepub fn pipeline_registry(&self) -> &Arc<PipelineRegistry> ⓘ
pub fn pipeline_registry(&self) -> &Arc<PipelineRegistry> ⓘ
Shared pipeline registry for CREATE SOURCE / CREATE SINK DDL.
Sourcepub fn operation_registry(&self) -> &Arc<OperationRegistry> ⓘ
pub fn operation_registry(&self) -> &Arc<OperationRegistry> ⓘ
Shared operation registry for cancellation and progress reporting.
Sourcepub fn deregister_table(&self, name: &str) -> SqlResult<()>
pub fn deregister_table(&self, name: &str) -> SqlResult<()>
Drop a named table from the session context.
Idempotent — dropping a name that was never registered is not an error.
Sourcepub fn register_table_udf_fn(
&self,
name: impl Into<String>,
schema: Schema,
f: impl Fn(&[ScalarValue]) -> Result<RecordBatch, UdfError> + Send + Sync + 'static,
) -> SqlResult<()>
pub fn register_table_udf_fn( &self, name: impl Into<String>, schema: Schema, f: impl Fn(&[ScalarValue]) -> Result<RecordBatch, UdfError> + Send + Sync + 'static, ) -> SqlResult<()>
Register a table UDF backed by a Rust closure.
The closure receives literal arguments passed by the SQL caller as
ScalarValue values and returns an Arrow RecordBatch. Non-literal
arguments are rejected because they cannot be evaluated safely at the
synchronous DataFusion table-function boundary. schema describes the
output columns.
§Example
engine.register_table_udf_fn(
"generate_ints",
Schema::new(vec![Field::new("n", DataType::Int64, false)]),
|args| {
let count = match args.first() {
Some(ScalarValue::Int64(n)) => *n,
_ => 10,
};
let arr = Int64Array::from((0..count).collect::<Vec<_>>());
Ok(RecordBatch::try_from_iter([("n", Arc::new(arr) as _)])?)
},
)?;Sourcepub fn is_streaming_query(&self, sql: &str) -> SqlResult<bool>
pub fn is_streaming_query(&self, sql: &str) -> SqlResult<bool>
Returns true if any table referenced in sql is a registered streaming source.
Sourcepub fn krishiv_catalog(&self) -> Option<&Arc<RwLock<InMemoryCatalog>>>
pub fn krishiv_catalog(&self) -> Option<&Arc<RwLock<InMemoryCatalog>>>
Shared Krishiv catalog backing this engine, if configured.
Sourcepub fn with_udf_registry(self, registry: Arc<RwLock<UdfRegistry>>) -> Self
pub fn with_udf_registry(self, registry: Arc<RwLock<UdfRegistry>>) -> Self
Share a session UDF registry so scalar UDFs are visible in SQL.
Sourcepub fn clear_plan_cache(&self)
pub fn clear_plan_cache(&self)
Expose cache invalidation for tests and external callers that register tables through a different path.
Sourcepub async fn sync_scalar_udfs(&self) -> SqlResult<()>
pub async fn sync_scalar_udfs(&self) -> SqlResult<()>
Register all scalar UDFs from the attached registry with DataFusion. Uses unlimited defaults (backward compat).
Sourcepub async fn sync_scalar_udfs_with_limits(
&self,
limits: ResourceLimits,
) -> SqlResult<()>
pub async fn sync_scalar_udfs_with_limits( &self, limits: ResourceLimits, ) -> SqlResult<()>
Register scalar UDFs with explicit ResourceLimits for sandbox enforcement. Callers that have a job context (scheduler, runner, api session for a job) should use this and pass limits derived from the JobSpec (memory + time cap). This is the concrete Track E seam from job limits to UDF execution.
Sourcepub async fn sync_scalar_udfs_with_limits_for_profile(
&self,
limits: ResourceLimits,
profile: DurabilityProfile,
) -> SqlResult<()>
pub async fn sync_scalar_udfs_with_limits_for_profile( &self, limits: ResourceLimits, profile: DurabilityProfile, ) -> SqlResult<()>
Register scalar UDFs using a caller-resolved durability profile.
Sourcepub async fn sync_scalar_udfs_with_limits_for_policy(
&self,
limits: ResourceLimits,
policy: NativeScalarUdfPolicy,
) -> SqlResult<()>
pub async fn sync_scalar_udfs_with_limits_for_policy( &self, limits: ResourceLimits, policy: NativeScalarUdfPolicy, ) -> SqlResult<()>
Register scalar UDFs using a caller-snapshotted policy decision.
Sourcepub async fn sync_aggregate_udfs(&self) -> SqlResult<()>
pub async fn sync_aggregate_udfs(&self) -> SqlResult<()>
Register aggregate UDFs from the attached registry (P1-21).
Sourcepub async fn sync_table_udfs(&self) -> SqlResult<()>
pub async fn sync_table_udfs(&self) -> SqlResult<()>
Register table UDFs from the attached registry (P1-21).
Sourcepub async fn sync_all_udfs(&self) -> SqlResult<()>
pub async fn sync_all_udfs(&self) -> SqlResult<()>
Sync all UDF categories, respecting any limits configured on this engine (Track E).
Sourcepub async fn register_parquet(
&self,
table_name: impl AsRef<str>,
path: impl AsRef<Path>,
) -> SqlResult<()>
pub async fn register_parquet( &self, table_name: impl AsRef<str>, path: impl AsRef<Path>, ) -> SqlResult<()>
Register a local Parquet path as a table.
Sourcepub async fn read_parquet(
&self,
path: impl AsRef<Path>,
) -> SqlResult<SqlDataFrame>
pub async fn read_parquet( &self, path: impl AsRef<Path>, ) -> SqlResult<SqlDataFrame>
Create a DataFrame by reading a local Parquet path directly.
Sourcepub async fn register_record_batches(
&self,
table_name: impl AsRef<str>,
batches: Vec<RecordBatch>,
) -> SqlResult<()>
pub async fn register_record_batches( &self, table_name: impl AsRef<str>, batches: Vec<RecordBatch>, ) -> SqlResult<()>
Register an in-memory table from Arrow record batches.
The schema is inferred from the first batch. An empty batches slice
registers a table with no rows using the provided schema if the batches
are non-empty, or is a no-op if empty.
Sourcepub async fn read_parquet_with_options(
&self,
path: impl AsRef<Path>,
opts: &ParquetReaderOptions,
) -> SqlResult<SqlDataFrame>
pub async fn read_parquet_with_options( &self, path: impl AsRef<Path>, opts: &ParquetReaderOptions, ) -> SqlResult<SqlDataFrame>
Create a DataFrame by reading a local Parquet path with typed options.
Sourcepub async fn read_csv(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame>
pub async fn read_csv(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame>
Create a DataFrame by reading a local CSV path directly.
Sourcepub async fn read_csv_with_options(
&self,
path: impl AsRef<Path>,
opts: &CsvReaderOptions,
) -> SqlResult<SqlDataFrame>
pub async fn read_csv_with_options( &self, path: impl AsRef<Path>, opts: &CsvReaderOptions, ) -> SqlResult<SqlDataFrame>
Create a DataFrame by reading a local CSV path with typed options.
Sourcepub async fn read_json(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame>
pub async fn read_json(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame>
Create a DataFrame by reading a local JSON/NDJSON path directly.
Sourcepub async fn read_delta(
&self,
path: impl AsRef<str>,
version: Option<i64>,
) -> SqlResult<SqlDataFrame>
pub async fn read_delta( &self, path: impl AsRef<str>, version: Option<i64>, ) -> SqlResult<SqlDataFrame>
Read a local Delta table directory into a DataFrame.
Sourcepub async fn read_hudi(
&self,
path: impl AsRef<str>,
query_type: HudiQueryType,
begin_instant: Option<&str>,
) -> SqlResult<SqlDataFrame>
pub async fn read_hudi( &self, path: impl AsRef<str>, query_type: HudiQueryType, begin_instant: Option<&str>, ) -> SqlResult<SqlDataFrame>
Read a Hudi table directory.
Sourcepub async fn sql(&self, query: impl AsRef<str>) -> SqlResult<SqlDataFrame>
pub async fn sql(&self, query: impl AsRef<str>) -> SqlResult<SqlDataFrame>
Plan a SQL query with DataFusion.
Sourcepub async fn execute_with_timeout(
&self,
query: impl AsRef<str> + Send,
timeout_ms: u64,
) -> SqlResult<SqlDataFrame>
pub async fn execute_with_timeout( &self, query: impl AsRef<str> + Send, timeout_ms: u64, ) -> SqlResult<SqlDataFrame>
Execute a SQL query with a timeout.
Returns SqlError::Timeout if timeout_ms elapses before the query
produces a result. The underlying DataFusion task is abandoned (not
cancelled at the engine level) when the timeout fires; its resources are
released when the spawned task eventually completes.
Sourcepub async fn execute_with_operation_id(
&self,
operation_id: u64,
query: impl AsRef<str> + Send,
cancelled_ids: &OperationRegistry,
) -> SqlResult<TaggedQueryResult>
pub async fn execute_with_operation_id( &self, operation_id: u64, query: impl AsRef<str> + Send, cancelled_ids: &OperationRegistry, ) -> SqlResult<TaggedQueryResult>
Execute a SQL query tagged with a caller-supplied operation ID.
The operation ID is recorded in the returned TaggedQueryResult and
can be used to correlate logs, metrics, and cancellation requests.
If cancelled_ids contains operation_id before execution begins the
function returns SqlError::OperationCancelled immediately.
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for SqlEngine
impl !UnwindSafe for SqlEngine
impl Freeze for SqlEngine
impl Send for SqlEngine
impl Sync for SqlEngine
impl Unpin for SqlEngine
impl UnsafeUnpin for SqlEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request