Skip to main content

SqlEngine

Struct SqlEngine 

Source
pub struct SqlEngine { /* private fields */ }

Implementations§

Source§

impl SqlEngine

Source

pub fn new() -> Self

Create a local SQL engine.

Window helper UDFs (tumble_start, tumble_end, hop_start, hop_end) are registered as part of construction. If registration fails the engine is still returned — non-window queries work — and a tracing::warn! is emitted. Use SqlEngine::try_new when callers need to surface the registration error.

DataFusion target_partitions defaults to 1 (single-threaded local execution). Use SqlEngine::with_target_parallelism to override.

Source

pub fn new_with_memory_limit(memory_limit_bytes: Option<usize>) -> Self

Create a local SQL engine whose DataFusion execution memory is capped at memory_limit_bytes.

When Some, the engine runs with a FairSpillPool of that size plus the default disk manager, so memory-intensive operators (sort, hash join, aggregation) spill to disk under pressure and queries that cannot spill fail with a resources-exhausted error instead of exhausting process memory. None keeps DataFusion’s default unbounded pool.

Shares SqlEngine::new’s fallback behavior for window helper UDF registration failures.

Source

pub fn try_new() -> SqlResult<Self>

Create a local SQL engine, propagating window helper registration errors.

Callers that need to abort startup when window functions cannot be registered should use this constructor.

Source

pub fn with_in_memory_catalog( catalog: Arc<RwLock<InMemoryCatalog>>, ) -> SqlResult<Self>

Create an engine whose krishiv catalog resolves tables registered in InMemoryCatalog (P0-10).

Source

pub fn with_target_parallelism(self, n: NonZeroUsize) -> Self

Set the DataFusion target_partitions parallelism level for this engine.

Higher values allow DataFusion to parallelise hash-join build, aggregation spilling, and parquet scans across more threads. Default: 1 (single-threaded). Recommended: available_parallelism().

Source

pub fn target_parallelism(&self) -> NonZeroUsize

Return the configured target_partitions parallelism level.

Source

pub fn memory_limit_bytes(&self) -> Option<usize>

Return the DataFusion memory pool limit for this engine, if bounded.

Source

pub fn shuffle_partitions(&self) -> Option<u32>

Return the current shuffle.partitions override, if set via SET shuffle.partitions = N.

Source

pub fn table_row_counts(&self) -> Arc<RwLock<HashMap<String, u64>>>

Return access to the table row-count registry.

Populated by register_parquet and register_record_batches with estimated row counts extracted from table-provider statistics. Used by SqlDataFrame::krishiv_logical_plan to annotate scan nodes.

Source

pub fn registered_table_names(&self) -> Vec<String>

Return table/view names registered in the live DataFusion catalog.

Uses DataFusion’s catalog provider API directly instead of routing through SHOW TABLES, which requires optional information-schema support in some DataFusion configurations.

Source

pub fn with_shuffle_partitions(self, n: Option<u32>) -> Self

Set an override for the shuffle partition count.

When n is Some, exchange and shuffle-write operations use n buckets instead of auto-sizing. Pass None to restore auto-sizing.

Source

pub fn register_streaming_table( &self, name: &str, schema: SchemaRef, ) -> SqlResult<Arc<ContinuousTableInput>>

Register an unbounded continuous table, returning its typed input.

The returned input uses a bounded channel with capacity crate::streaming::CONTINUOUS_TABLE_CHANNEL_CAPACITY. When the consumer (the DataFusion query plan) is slower than the producer, ContinuousTableInput::send(...).await backpressures the producer, and ContinuousTableInput::try_send(...) returns a resource error rather than growing memory without limit. Use [register_streaming_table_with_capacity] for a non-default capacity.

Source

pub fn register_streaming_table_with_capacity( &self, name: &str, schema: SchemaRef, capacity: usize, ) -> SqlResult<Arc<ContinuousTableInput>>

Same as Self::register_streaming_table but with a caller-supplied channel capacity. Useful for tests that want to exercise the full/empty channel boundary without pushing CONTINUOUS_TABLE_CHANNEL_CAPACITY (64) batches.

Source

pub fn register_kafka_source( &self, table_name: impl AsRef<str>, schema: SchemaRef, bootstrap_servers: impl Into<String>, topic: impl Into<String>, group_id: impl Into<String>, ) -> SqlResult<()>

Register a live Kafka/Redpanda topic as an unbounded streaming table.

This is the native Rust path — no Python bridge or external process required. Under the hood it creates an rdkafka consumer and wraps it in a DataFusion StreamingTable so normal SQL queries (SELECT, GROUP BY, windowed aggregations) work against the live topic.

Equivalent SQL DDL:

CREATE EXTERNAL TABLE <name> (<cols>) STORED AS KAFKA
  LOCATION '<topic>'
  OPTIONS ('bootstrap.servers' = '…', 'group.id' = '…');
Source

pub async fn sql_to_kafka( &self, sql: impl AsRef<str>, bootstrap_servers: impl Into<String>, topic: impl Into<String>, ) -> SqlResult<u64>

Execute a SQL query and write every result row to a Kafka/Redpanda topic.

Each row is serialised as a JSON object using the same format as [KafkaSink]. The method blocks until the query stream ends and the producer queue is flushed, then returns the total number of rows written.

Note: If sql targets an unbounded streaming table (e.g. one registered via [register_kafka_source]) this call will never return. Use it with batch sources or add a LIMIT clause.

Source

pub fn with_udf_limits(self, limits: ResourceLimits) -> Self

Configure this engine with explicit UDF resource limits (Track E). When set, calls to sql() and direct UDF syncs will use these budgets instead of unlimited defaults. Intended for job-specific engines.

Source

pub fn is_streaming_source(&self, table_name: &str) -> bool

Returns true if table_name is registered as an unbounded streaming source.

Source

pub fn register_streaming_source_name( &self, table_name: impl Into<String>, ) -> SqlResult<()>

Register a table name as a streaming source without creating a live connector.

This is the test-safe alternative to [register_kafka_source]: it marks table_name in the streaming_sources set so that is_streaming_query returns true for queries that reference it, without constructing any broker connection. Useful for unit tests where a live Kafka broker is not available and rdkafka’s log subsystem is not initialised. Returns SqlError::EmptyTableName if table_name is blank.

Source

pub fn deregister_streaming_source(&self, name: &str) -> SqlResult<()>

Remove a streaming source registration.

Deregisters the table from DataFusion and removes it from the streaming- sources set. Invalidates the plan cache. Idempotent — deregistering a name that was never registered is not an error.

Source

pub fn live_table_registry(&self) -> &Arc<LiveTableRegistry>

Shared live-table registry for CREATE LIVE TABLE DDL.

Source

pub fn incremental_view_registry(&self) -> &Arc<IncrementalViewRegistry>

Shared incremental-view registry for CREATE INCREMENTAL VIEW DDL.

Source

pub fn pipeline_registry(&self) -> &Arc<PipelineRegistry>

Shared pipeline registry for CREATE SOURCE / CREATE SINK DDL.

Source

pub fn operation_registry(&self) -> &Arc<OperationRegistry>

Shared operation registry for cancellation and progress reporting.

Source

pub fn deregister_table(&self, name: &str) -> SqlResult<()>

Drop a named table from the session context.

Idempotent — dropping a name that was never registered is not an error.

Source

pub fn register_table_udf_fn( &self, name: impl Into<String>, schema: Schema, f: impl Fn(&[ScalarValue]) -> Result<RecordBatch, UdfError> + Send + Sync + 'static, ) -> SqlResult<()>

Register a table UDF backed by a Rust closure.

The closure receives literal arguments passed by the SQL caller as ScalarValue values and returns an Arrow RecordBatch. Non-literal arguments are rejected because they cannot be evaluated safely at the synchronous DataFusion table-function boundary. schema describes the output columns.

§Example
engine.register_table_udf_fn(
    "generate_ints",
    Schema::new(vec![Field::new("n", DataType::Int64, false)]),
    |args| {
        let count = match args.first() {
            Some(ScalarValue::Int64(n)) => *n,
            _ => 10,
        };
        let arr = Int64Array::from((0..count).collect::<Vec<_>>());
        Ok(RecordBatch::try_from_iter([("n", Arc::new(arr) as _)])?)
    },
)?;
Source

pub fn is_streaming_query(&self, sql: &str) -> SqlResult<bool>

Returns true if any table referenced in sql is a registered streaming source.

Source

pub fn krishiv_catalog(&self) -> Option<&Arc<RwLock<InMemoryCatalog>>>

Shared Krishiv catalog backing this engine, if configured.

Source

pub fn with_udf_registry(self, registry: Arc<RwLock<UdfRegistry>>) -> Self

Share a session UDF registry so scalar UDFs are visible in SQL.

Source

pub fn clear_plan_cache(&self)

Expose cache invalidation for tests and external callers that register tables through a different path.

Source

pub async fn sync_scalar_udfs(&self) -> SqlResult<()>

Register all scalar UDFs from the attached registry with DataFusion. Uses unlimited defaults (backward compat).

Source

pub async fn sync_scalar_udfs_with_limits( &self, limits: ResourceLimits, ) -> SqlResult<()>

Register scalar UDFs with explicit ResourceLimits for sandbox enforcement. Callers that have a job context (scheduler, runner, api session for a job) should use this and pass limits derived from the JobSpec (memory + time cap). This is the concrete Track E seam from job limits to UDF execution.

Source

pub async fn sync_scalar_udfs_with_limits_for_profile( &self, limits: ResourceLimits, profile: DurabilityProfile, ) -> SqlResult<()>

Register scalar UDFs using a caller-resolved durability profile.

Source

pub async fn sync_scalar_udfs_with_limits_for_policy( &self, limits: ResourceLimits, policy: NativeScalarUdfPolicy, ) -> SqlResult<()>

Register scalar UDFs using a caller-snapshotted policy decision.

Source

pub async fn sync_aggregate_udfs(&self) -> SqlResult<()>

Register aggregate UDFs from the attached registry (P1-21).

Source

pub async fn sync_table_udfs(&self) -> SqlResult<()>

Register table UDFs from the attached registry (P1-21).

Source

pub async fn sync_all_udfs(&self) -> SqlResult<()>

Sync all UDF categories, respecting any limits configured on this engine (Track E).

Source

pub async fn register_parquet( &self, table_name: impl AsRef<str>, path: impl AsRef<Path>, ) -> SqlResult<()>

Register a local Parquet path as a table.

Source

pub async fn read_parquet( &self, path: impl AsRef<Path>, ) -> SqlResult<SqlDataFrame>

Create a DataFrame by reading a local Parquet path directly.

Source

pub async fn register_record_batches( &self, table_name: impl AsRef<str>, batches: Vec<RecordBatch>, ) -> SqlResult<()>

Register an in-memory table from Arrow record batches.

The schema is inferred from the first batch. An empty batches slice registers a table with no rows using the provided schema if the batches are non-empty, or is a no-op if empty.

Source

pub async fn read_parquet_with_options( &self, path: impl AsRef<Path>, opts: &ParquetReaderOptions, ) -> SqlResult<SqlDataFrame>

Create a DataFrame by reading a local Parquet path with typed options.

Source

pub async fn read_csv(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame>

Create a DataFrame by reading a local CSV path directly.

Source

pub async fn read_csv_with_options( &self, path: impl AsRef<Path>, opts: &CsvReaderOptions, ) -> SqlResult<SqlDataFrame>

Create a DataFrame by reading a local CSV path with typed options.

Source

pub async fn read_json(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame>

Create a DataFrame by reading a local JSON/NDJSON path directly.

Source

pub async fn read_delta( &self, path: impl AsRef<str>, version: Option<i64>, ) -> SqlResult<SqlDataFrame>

Read a local Delta table directory into a DataFrame.

Source

pub async fn read_hudi( &self, path: impl AsRef<str>, query_type: HudiQueryType, begin_instant: Option<&str>, ) -> SqlResult<SqlDataFrame>

Read a Hudi table directory.

Source

pub async fn sql(&self, query: impl AsRef<str>) -> SqlResult<SqlDataFrame>

Plan a SQL query with DataFusion.

Source

pub async fn execute_with_timeout( &self, query: impl AsRef<str> + Send, timeout_ms: u64, ) -> SqlResult<SqlDataFrame>

Execute a SQL query with a timeout.

Returns SqlError::Timeout if timeout_ms elapses before the query produces a result. The underlying DataFusion task is abandoned (not cancelled at the engine level) when the timeout fires; its resources are released when the spawned task eventually completes.

Source

pub async fn execute_with_operation_id( &self, operation_id: u64, query: impl AsRef<str> + Send, cancelled_ids: &OperationRegistry, ) -> SqlResult<TaggedQueryResult>

Execute a SQL query tagged with a caller-supplied operation ID.

The operation ID is recorded in the returned TaggedQueryResult and can be used to correlate logs, metrics, and cancellation requests. If cancelled_ids contains operation_id before execution begins the function returns SqlError::OperationCancelled immediately.

Trait Implementations§

Source§

impl Clone for SqlEngine

Source§

fn clone(&self) -> SqlEngine

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for SqlEngine

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for SqlEngine

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more