Skip to main content

krishiv_sql/
lib.rs

1#![forbid(unsafe_code)]
2
3//! SQL planning and local execution seam for Krishiv.
4//!
5//! This crate owns the DataFusion integration for R1 while keeping DataFusion
6//! out of the long-term public API exposed by `krishiv-api`.
7
8use std::collections::{BTreeSet, HashMap, VecDeque};
9use std::fmt;
10use std::num::NonZeroUsize;
11use std::ops::ControlFlow;
12use std::path::Path;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::{Arc, Mutex, RwLock};
15
16use arrow::datatypes::SchemaRef;
17use arrow::record_batch::RecordBatch;
18use arrow::util::pretty::pretty_format_batches;
19use catalog::{InMemoryCatalog, datafusion_bridge::DataFusionCatalogBridge};
20use datafusion::dataframe::DataFrame as DataFusionDataFrame;
21use datafusion::prelude::{ParquetReadOptions, SessionContext};
22use datafusion::sql::sqlparser::{ast::visit_relations, dialect::GenericDialect, parser::Parser};
23use object_store::aws::AmazonS3Builder;
24
25use krishiv_plan::optimizer::{CostModel, Optimizer};
26use krishiv_plan::{ExecutionKind, LogicalPlan, PlanNode};
27
28pub mod analyze;
29pub mod catalog;
30pub mod cep_sql;
31
32pub mod connector_table;
33pub mod create_function_ddl;
34pub mod grammar;
35pub mod incremental_view;
36pub mod introspection_sql;
37
38pub mod kafka_table;
39pub mod lakehouse;
40pub mod live_table;
41pub mod pipeline_ddl;
42pub mod pivot_sql;
43pub mod recursive_cte;
44/// Spark SQL extensions: LATERAL VIEW, TABLESAMPLE, TRANSFORM, DESCRIBE EXTENDED, etc.
45pub mod spark_sql_ext;
46pub mod sqlstate;
47pub mod subquery;
48pub mod unnest_sql;
49
50pub mod streaming;
51pub mod streaming_tvf;
52pub mod streaming_window_plan;
53mod udf;
54mod window_functions;
55
56pub use cep_sql::{
57    MatchRecognizeStatement, execute_streaming_match_recognize, parse_match_recognize,
58};
59pub use lakehouse::{AsOfTableRef, MergeResult, MergeTargetUnsupportedError, preprocess_as_of_sql};
60
61pub use grammar::{
62    FeatureEntry, FeatureStatus, feature_matrix, features_by_status, features_for_category,
63};
64pub use sqlstate::{SqlStateError, sqlstate_for};
65pub use streaming::{ContinuousInputError, ContinuousTableInput};
66
67/// SQL result alias.
68pub type SqlResult<T> = Result<T, SqlError>;
69
70/// Pinned stream of record batches with typed [`SqlError`] items.
71///
72/// Previously this used `String` as the error type, which lost diagnostic
73/// information at the stream boundary. Callers that need a `String` error can
74/// map with `|e| e.to_string()`.
75pub type SqlStream =
76    std::pin::Pin<Box<dyn futures::stream::Stream<Item = Result<RecordBatch, SqlError>> + Send>>;
77
78/// Global counter for unique ephemeral table names, preventing concurrent
79/// MERGE/CEP queries from overwriting each other's result tables.
80static EPHEMERAL_TABLE_COUNTER: AtomicU64 = AtomicU64::new(0);
81
82fn next_ephemeral_name(prefix: &str) -> String {
83    let id = EPHEMERAL_TABLE_COUNTER.fetch_add(1, Ordering::Relaxed);
84    format!("__{prefix}_{id}")
85}
86
87// ── Plan cache (single-lock, race-free) ──────────────────────────────────────
88
89/// Whether the [`SqlEngine`] internal builder should attempt to register the
90/// helper window UDFs (`tumble_start` / `tumble_end` / `hop_start` / `hop_end`).
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92enum WindowFnRegistration {
93    /// Call `window_functions::register_window_functions`; propagate any error.
94    Register,
95    /// Skip registration entirely; infallible. Used as a fallback by
96    /// [`SqlEngine::new`] when `Register` fails so the engine is still usable
97    /// for non-window queries.
98    Skip,
99}
100
101/// Bounded query-plan cache keyed by query text.
102///
103/// A single `Mutex<PlanCache>` replaces the previous two-structure approach
104/// (`DashMap` + `Mutex<VecDeque>`) which had a TOCTOU race: two threads could
105/// both see `len() < MAX` and both insert, growing the cache past the limit.
106struct PlanCache {
107    map: HashMap<String, datafusion::logical_expr::LogicalPlan>,
108    order: VecDeque<String>,
109    max: usize,
110}
111
112impl PlanCache {
113    fn new(max: usize) -> Self {
114        Self {
115            map: HashMap::new(),
116            order: VecDeque::new(),
117            max,
118        }
119    }
120
121    fn get(&self, key: &str) -> Option<&datafusion::logical_expr::LogicalPlan> {
122        self.map.get(key)
123    }
124
125    fn insert(&mut self, key: String, plan: datafusion::logical_expr::LogicalPlan) {
126        if self.map.contains_key(&key) {
127            // Remove the stale order entry so a repeated insert doesn't accumulate
128            // duplicate references and corrupt LRU eviction order.
129            self.order.retain(|k| k != &key);
130        } else if self.map.len() >= self.max
131            && let Some(oldest) = self.order.pop_front()
132        {
133            self.map.remove(&oldest);
134        }
135        self.order.push_back(key.clone());
136        self.map.insert(key, plan);
137    }
138
139    fn clear(&mut self) {
140        self.map.clear();
141        self.order.clear();
142    }
143
144    #[cfg(test)]
145    fn is_empty(&self) -> bool {
146        self.map.is_empty()
147    }
148}
149
150/// Typed options for Parquet reads (propagated into DataFusion).
151#[derive(Debug, Clone, Default)]
152pub struct ParquetReaderOptions {
153    /// Maximum number of rows per output batch (None = DataFusion default 8192).
154    pub batch_size: Option<usize>,
155}
156
157/// Typed options for CSV reads (propagated into DataFusion).
158#[derive(Debug, Clone, Default)]
159pub struct CsvReaderOptions {
160    /// Field delimiter character (None = `,`).
161    pub delimiter: Option<char>,
162    /// Whether the first row is a header (None = true).
163    pub has_header: Option<bool>,
164}
165
166/// Typed options for Parquet writes (propagated into the `ArrowWriter`).
167#[derive(Debug, Clone, Default)]
168pub struct ParquetWriterOptions {
169    /// Compression codec: "snappy" | "zstd" | "gzip" | "lz4" | "brotli" | "uncompressed".
170    pub compression: Option<String>,
171    /// Maximum number of rows per row-group (None = `ArrowWriter` default 1 048 576).
172    pub max_row_group_size: Option<usize>,
173}
174
175/// Typed options for CSV writes.
176#[derive(Debug, Clone, Default)]
177pub struct CsvWriterOptions {
178    /// Field delimiter character (None = `,`).
179    pub delimiter: Option<char>,
180    /// Whether to emit a header row (None = true).
181    pub has_header: Option<bool>,
182}
183
184/// SQL-layer errors.
185#[non_exhaustive]
186#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
187pub enum SqlError {
188    /// Query was empty or whitespace only.
189    #[error("SQL query is empty")]
190    EmptyQuery,
191    /// A table name was empty.
192    #[error("table name is empty")]
193    EmptyTableName,
194    /// The requested SQL feature is not available in R1.
195    #[error("unsupported SQL feature: {feature}")]
196    Unsupported { feature: String },
197    /// A table-function declaration or runtime registration was invalid.
198    #[error("invalid table function: {message}")]
199    InvalidTableFunction { message: String },
200    /// DataFusion returned an error.
201    #[error("DataFusion error: {message}")]
202    DataFusion { message: String },
203    /// Krishiv logical-plan optimization failed.
204    #[error(transparent)]
205    Optimizer(#[from] krishiv_plan::optimizer::OptimizerError),
206    /// Access denied by auth or policy check.
207    #[error("access denied: {reason}")]
208    AccessDenied { reason: String },
209    /// A running operation was cancelled by the caller.
210    #[error("operation {operation_id} was cancelled")]
211    OperationCancelled { operation_id: u64 },
212    /// A query exceeded its configured execution timeout.
213    #[error("query timed out after {timeout_ms} ms")]
214    Timeout { timeout_ms: u64 },
215}
216
217impl From<datafusion::error::DataFusionError> for SqlError {
218    fn from(value: datafusion::error::DataFusionError) -> Self {
219        Self::DataFusion {
220            message: value.to_string(),
221        }
222    }
223}
224
225/// SQL planning output.
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct SqlPlan {
228    query: String,
229    logical_plan: LogicalPlan,
230}
231
232impl SqlPlan {
233    /// Original query.
234    pub fn query(&self) -> &str {
235        &self.query
236    }
237
238    /// Krishiv logical plan wrapper.
239    pub fn logical_plan(&self) -> &LogicalPlan {
240        &self.logical_plan
241    }
242}
243
244/// Local SQL engine backed by DataFusion.
245///
246/// **Local-only**: All SQL execution is in-process via DataFusion. No distributed SQL
247/// execution path is available in this crate.
248/// This crate is scoped to R1 — DataFusion will be abstracted behind
249/// the `KrishivDataFrameOps` trait in future releases.
250///
251/// Methods like `register_parquet`, `read_delta`, and `read_hudi` treat
252/// path arguments as local filesystem paths. S3/GCS paths require the
253/// object-store connector layer.
254/// Maximum number of query plans stored in the plan cache before random eviction.
255const PLAN_CACHE_MAX_ENTRIES: usize = 256;
256
257fn resolve_plan_cache_max_entries() -> usize {
258    std::env::var("KRISHIV_PLAN_CACHE_MAX_ENTRIES")
259        .ok()
260        .and_then(|v| v.parse().ok())
261        .filter(|&n| n > 0)
262        .unwrap_or(PLAN_CACHE_MAX_ENTRIES)
263}
264const STREAMING_CEP_MAX_ROWS_DEFAULT: usize = 100_000;
265
266/// Resolve the streaming MATCH_RECOGNIZE row cap from a raw env var value.
267/// `None` and unparseable values fall back to the documented default of
268/// 100_000. Zero is rejected because it would mean "scan zero rows".
269pub fn resolve_streaming_match_recognize_limit(raw: Option<&str>) -> usize {
270    raw.and_then(|s| s.parse::<usize>().ok())
271        .filter(|n| *n > 0)
272        .unwrap_or(STREAMING_CEP_MAX_ROWS_DEFAULT)
273}
274
275/// Resolve the streaming MATCH_RECOGNIZE row cap from the
276/// `KRISHIV_MATCH_RECOGNIZE_STREAMING_LIMIT` environment variable.
277pub fn streaming_match_recognize_limit_from_env() -> usize {
278    resolve_streaming_match_recognize_limit(
279        std::env::var("KRISHIV_MATCH_RECOGNIZE_STREAMING_LIMIT")
280            .ok()
281            .as_deref(),
282    )
283}
284
285/// Resolve a per-engine DataFusion memory limit from a raw env var value.
286/// `None`, unparseable, and zero values all mean "no limit" (the engine runs
287/// with DataFusion's default unbounded pool).
288pub fn resolve_query_memory_limit_bytes(raw: Option<&str>) -> Option<usize> {
289    raw.and_then(|s| s.trim().parse::<usize>().ok())
290        .filter(|n| *n > 0)
291}
292
293/// Resolve the default per-engine memory limit from the
294/// `KRISHIV_QUERY_MEMORY_LIMIT_BYTES` environment variable.
295pub fn query_memory_limit_from_env() -> Option<usize> {
296    resolve_query_memory_limit_bytes(
297        std::env::var("KRISHIV_QUERY_MEMORY_LIMIT_BYTES")
298            .ok()
299            .as_deref(),
300    )
301}
302
303/// Resolve the batch size from `KRISHIV_BATCH_SIZE` env var.
304///
305/// Falls back to DataFusion's default (8192) if unset or invalid.
306pub fn batch_size_from_env() -> usize {
307    std::env::var("KRISHIV_BATCH_SIZE")
308        .ok()
309        .and_then(|v| v.parse::<usize>().ok())
310        .filter(|n| *n > 0)
311        .unwrap_or(8192)
312}
313
314/// Resolve the default parallelism from `KRISHIV_TARGET_PARALLELISM` env var.
315///
316/// Falls back to available parallelism if unset.
317pub fn default_parallelism_from_env() -> NonZeroUsize {
318    std::env::var("KRISHIV_TARGET_PARALLELISM")
319        .ok()
320        .and_then(|v| v.parse::<usize>().ok())
321        .and_then(NonZeroUsize::new)
322        .unwrap_or_else(|| std::thread::available_parallelism().unwrap_or(NonZeroUsize::MIN))
323}
324
325/// Build the DataFusion session config with a configurable parallelism level.
326///
327/// When `target_partitions > 1`, round-robin repartitioning is enabled so
328/// DataFusion can balance work across threads for hash-join build,
329/// aggregation spill, and parquet scan parallelism.
330///
331/// `execution.batch_size` is set from `KRISHIV_BATCH_SIZE` (default: 8192).
332fn build_single_node_session_config(
333    target_partitions: NonZeroUsize,
334) -> datafusion::prelude::SessionConfig {
335    let tp = target_partitions.get();
336    let batch_size = batch_size_from_env();
337    let mut config = datafusion::prelude::SessionConfig::new()
338        .with_target_partitions(tp)
339        .with_batch_size(batch_size)
340        .set_bool(
341            "datafusion.optimizer.enable_round_robin_repartition",
342            tp > 1,
343        );
344    config.options_mut().execution.parquet.pushdown_filters = true;
345    config.options_mut().execution.parquet.enable_page_index = true;
346    config
347}
348
349#[derive(Clone)]
350pub struct SqlEngine {
351    context: SessionContext,
352    target_parallelism: NonZeroUsize,
353    krishiv_catalog: Option<Arc<RwLock<InMemoryCatalog>>>,
354    udf_registry: Option<std::sync::Arc<std::sync::RwLock<krishiv_plan::udf::UdfRegistry>>>,
355    /// Table names registered as unbounded streaming sources.
356    /// Wrapped in `Arc<RwLock<>>` so that Session clones share the same set.
357    streaming_sources: Arc<RwLock<std::collections::HashSet<String>>>,
358    /// Serializes streaming table name validation and catalog registration.
359    streaming_registration: Arc<Mutex<()>>,
360    /// `true` once any streaming source has been registered.  Checked with a
361    /// relaxed atomic load before acquiring `streaming_sources` so that the
362    /// common case (no streaming sources, pure batch workload) avoids both the
363    /// lock and the SQL parse inside `is_streaming_query`.
364    has_streaming_sources: Arc<AtomicBool>,
365    /// Optional UDF resource limits to apply when syncing UDFs for this engine.
366    /// Set for job-specific engines so sandbox enforcement uses the job's budgets.
367    udf_limits: Option<krishiv_plan::udf::ResourceLimits>,
368    /// Monotonically increasing version counter; incremented on every UDF
369    /// registration or removal. Used to skip `sync_all_udfs()` when nothing
370    /// has changed since the last sync.
371    udf_registry_version: Arc<AtomicU64>,
372    /// The version at which the last `sync_all_udfs()` was performed.
373    /// Compared against `udf_registry_version` to detect staleness.
374    udf_last_synced_version: Arc<AtomicU64>,
375    /// Bounded query plan cache: query text → DataFusion LogicalPlan.
376    /// Skips re-parsing and re-optimising identical repeated queries.
377    /// Max `PLAN_CACHE_MAX_ENTRIES` entries; oldest entry evicted when full.
378    /// Single-lock design prevents the TOCTOU race of the previous two-structure
379    /// (`DashMap` + `VecDeque`) implementation.
380    plan_cache: Arc<Mutex<PlanCache>>,
381    /// Override for shuffle partition count (`SET shuffle.partitions = N`).
382    /// When `Some`, exchange nodes use this bucket count instead of auto-sizing.
383    shuffle_partitions: Arc<std::sync::RwLock<Option<u32>>>,
384    /// Estimated row counts for registered tables, keyed by table name.
385    /// Populated by `register_parquet` and `register_record_batches`.
386    /// Used by `krishiv_logical_plan` to annotate scan nodes for the
387    /// `BroadcastAutoRule` optimizer.
388    table_row_counts: Arc<std::sync::RwLock<HashMap<String, u64>>>,
389    /// DataFusion memory pool limit in bytes for this engine, when bounded.
390    /// `None` means the default unbounded pool. When `Some`, the engine runs
391    /// with a `FairSpillPool` so sorts, hash joins, and aggregations spill to
392    /// disk under memory pressure instead of growing without bound.
393    memory_limit_bytes: Option<usize>,
394    /// Iceberg catalogs registered via `with_iceberg_catalog`, keyed by their
395    /// DataFusion catalog name. Stored so that `CALL system.<proc>` statements
396    /// can dispatch maintenance operations to the right catalog.
397    #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
398    iceberg_catalogs: Arc<std::sync::RwLock<Vec<(Arc<catalog::unified::KrishivCatalog>, String)>>>,
399    /// Live-table DDL registry shared across SQL and session APIs.
400    live_table_registry: Arc<live_table::LiveTableRegistry>,
401    /// Incremental-view DDL registry shared across SQL and session APIs.
402    incremental_view_registry: Arc<incremental_view::IncrementalViewRegistry>,
403    /// Pipeline DDL registry (CREATE SOURCE / CREATE SINK metadata).
404    pipeline_registry: Arc<pipeline_ddl::PipelineRegistry>,
405    /// Cancelled operation IDs and progress snapshots for query lifecycle control.
406    operation_registry: Arc<OperationRegistry>,
407}
408
409impl fmt::Debug for SqlEngine {
410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411        f.debug_struct("SqlEngine")
412            .field("backend", &"datafusion")
413            .finish_non_exhaustive()
414    }
415}
416
417impl Default for SqlEngine {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423impl SqlEngine {
424    /// Create a local SQL engine.
425    ///
426    /// Window helper UDFs (`tumble_start`, `tumble_end`, `hop_start`, `hop_end`)
427    /// are registered as part of construction. If registration fails the
428    /// engine is still returned — non-window queries work — and a
429    /// `tracing::warn!` is emitted. Use [`SqlEngine::try_new`] when callers
430    /// need to surface the registration error.
431    ///
432    /// DataFusion `target_partitions` defaults to 1 (single-threaded local
433    /// execution). Use [`SqlEngine::with_target_parallelism`] to override.
434    pub fn new() -> Self {
435        Self::new_with_memory_limit(query_memory_limit_from_env())
436    }
437
438    /// Create a local SQL engine whose DataFusion execution memory is capped
439    /// at `memory_limit_bytes`.
440    ///
441    /// When `Some`, the engine runs with a `FairSpillPool` of that size plus
442    /// the default disk manager, so memory-intensive operators (sort, hash
443    /// join, aggregation) spill to disk under pressure and queries that cannot
444    /// spill fail with a resources-exhausted error instead of exhausting
445    /// process memory. `None` keeps DataFusion's default unbounded pool.
446    ///
447    /// Shares [`SqlEngine::new`]'s fallback behavior for window helper UDF
448    /// registration failures.
449    pub fn new_with_memory_limit(memory_limit_bytes: Option<usize>) -> Self {
450        match Self::build_local(
451            None,
452            WindowFnRegistration::Register,
453            NonZeroUsize::MIN,
454            memory_limit_bytes,
455        ) {
456            Ok(engine) => engine,
457            Err(err) => {
458                tracing::warn!(
459                    error = %err,
460                    "SqlEngine::new: window helper UDF registration failed; \
461                     window SQL functions will be unavailable, other queries are unaffected"
462                );
463                Self::build_local(
464                    None,
465                    WindowFnRegistration::Skip,
466                    NonZeroUsize::MIN,
467                    memory_limit_bytes,
468                )
469                .unwrap_or_else(|err| {
470                    tracing::error!(
471                        error = %err,
472                        "memory-limited DataFusion runtime construction failed; \
473                         falling back to an unbounded engine"
474                    );
475                    Self::build_local(None, WindowFnRegistration::Skip, NonZeroUsize::MIN, None)
476                        .unwrap_or_else(|_| Self::build_absolute_minimal(NonZeroUsize::MIN))
477                })
478            }
479        }
480    }
481
482    /// Create a local SQL engine, propagating window helper registration errors.
483    ///
484    /// Callers that need to abort startup when window functions cannot be
485    /// registered should use this constructor.
486    pub fn try_new() -> SqlResult<Self> {
487        Self::build_local(
488            None,
489            WindowFnRegistration::Register,
490            NonZeroUsize::MIN,
491            query_memory_limit_from_env(),
492        )
493    }
494
495    /// Create an engine whose `krishiv` catalog resolves tables registered in `InMemoryCatalog` (P0-10).
496    pub fn with_in_memory_catalog(catalog: Arc<RwLock<InMemoryCatalog>>) -> SqlResult<Self> {
497        if krishiv_common::profile_requires_fail_closed_metadata(
498            krishiv_common::resolve_durability_profile(),
499        ) {
500            return Err(SqlError::DataFusion {
501                message: String::from(
502                    "InMemoryCatalog is dev-only; configure a durable REST or file-backed \
503                     catalog for production deployments",
504                ),
505            });
506        }
507        Self::build_local(
508            Some(catalog),
509            WindowFnRegistration::Register,
510            NonZeroUsize::MIN,
511            query_memory_limit_from_env(),
512        )
513    }
514
515    /// Set the DataFusion `target_partitions` parallelism level for this engine.
516    ///
517    /// Higher values allow DataFusion to parallelise hash-join build,
518    /// aggregation spilling, and parquet scans across more threads.
519    /// Default: 1 (single-threaded). Recommended: `available_parallelism()`.
520    #[must_use]
521    pub fn with_target_parallelism(mut self, n: NonZeroUsize) -> Self {
522        self.target_parallelism = n;
523        self
524    }
525
526    /// Return the configured `target_partitions` parallelism level.
527    pub fn target_parallelism(&self) -> NonZeroUsize {
528        self.target_parallelism
529    }
530
531    /// Return the DataFusion memory pool limit for this engine, if bounded.
532    pub fn memory_limit_bytes(&self) -> Option<usize> {
533        self.memory_limit_bytes
534    }
535
536    /// Return the current `shuffle.partitions` override, if set via `SET shuffle.partitions = N`.
537    pub fn shuffle_partitions(&self) -> Option<u32> {
538        *self
539            .shuffle_partitions
540            .read()
541            .unwrap_or_else(|e| e.into_inner())
542    }
543
544    /// Return access to the table row-count registry.
545    ///
546    /// Populated by `register_parquet` and `register_record_batches` with
547    /// estimated row counts extracted from table-provider statistics. Used
548    /// by `SqlDataFrame::krishiv_logical_plan` to annotate scan nodes.
549    pub fn table_row_counts(&self) -> Arc<std::sync::RwLock<HashMap<String, u64>>> {
550        Arc::clone(&self.table_row_counts)
551    }
552
553    /// Return table/view names registered in the live DataFusion catalog.
554    ///
555    /// Uses DataFusion's catalog provider API directly instead of routing
556    /// through `SHOW TABLES`, which requires optional information-schema
557    /// support in some DataFusion configurations.
558    pub fn registered_table_names(&self) -> Vec<String> {
559        let mut names = Vec::new();
560        for catalog_name in self.context.catalog_names() {
561            let Some(catalog) = self.context.catalog(&catalog_name) else {
562                continue;
563            };
564            for schema_name in catalog.schema_names() {
565                let Some(schema) = catalog.schema(&schema_name) else {
566                    continue;
567                };
568                names.extend(schema.table_names());
569            }
570        }
571        names.sort();
572        names.dedup();
573        names
574    }
575
576    /// Build a `SqlDataFrame` with this engine's shared session context attached
577    /// so that `cache()` / `create_or_replace_temp_view()` work on the live session.
578    fn make_sql_df(&self, name: &str, dataframe: DataFusionDataFrame) -> SqlDataFrame {
579        SqlDataFrame::new(name, dataframe, self.table_row_counts())
580            .with_context(self.context.clone())
581    }
582
583    /// Attach SQL text and execution kind derived from registered streaming sources.
584    fn attach_query_metadata(&self, df: SqlDataFrame, query: &str) -> SqlDataFrame {
585        let kind = if self.is_streaming_query(query).unwrap_or(false) {
586            ExecutionKind::Streaming
587        } else {
588            ExecutionKind::Batch
589        };
590        df.with_query(query).with_execution_kind(kind)
591    }
592
593    /// Set an override for the shuffle partition count.
594    ///
595    /// When `n` is `Some`, exchange and shuffle-write operations use `n` buckets
596    /// instead of auto-sizing. Pass `None` to restore auto-sizing.
597    #[must_use]
598    pub fn with_shuffle_partitions(self, n: Option<u32>) -> Self {
599        if let Ok(mut guard) = self.shuffle_partitions.write() {
600            *guard = n;
601        }
602        self
603    }
604
605    /// Internal builder shared by the public constructors.
606    ///
607    /// `krishiv_catalog` is `Some(...)` when the engine should bridge to an
608    /// `InMemoryCatalog`; `None` for a default engine.
609    ///
610    /// `window_fn_registration` controls whether the helper UDFs
611    /// (`tumble_start` / `tumble_end` / `hop_start` / `hop_end`) are
612    /// registered. `Skip` is used as a fallback by [`SqlEngine::new`] when
613    /// `Register` fails; it is infallible.
614    fn build_local(
615        krishiv_catalog: Option<Arc<RwLock<InMemoryCatalog>>>,
616        window_fn_registration: WindowFnRegistration,
617        target_partitions: NonZeroUsize,
618        memory_limit_bytes: Option<usize>,
619    ) -> SqlResult<Self> {
620        // Create streaming_sources first so it can be shared with KafkaTableFactory.
621        // DDL-created Kafka tables (CREATE EXTERNAL TABLE … STORED AS KAFKA) then
622        // correctly register in is_streaming_query.
623        let streaming_sources: Arc<RwLock<std::collections::HashSet<String>>> =
624            Arc::new(RwLock::new(std::collections::HashSet::new()));
625
626        let dummy_state = datafusion::execution::session_state::SessionStateBuilder::new()
627            .with_default_features()
628            .build();
629        let mut table_factories = dummy_state.table_factories().clone();
630        crate::connector_table::register_connector_table_factories(
631            &mut table_factories,
632            streaming_sources.clone(),
633        );
634        let mut state_builder = datafusion::execution::session_state::SessionStateBuilder::new()
635            .with_default_features()
636            .with_config(build_single_node_session_config(target_partitions))
637            .with_table_factories(table_factories);
638        if let Some(limit) = memory_limit_bytes {
639            // A FairSpillPool shares the limit across concurrently running
640            // operators and lets spill-capable operators (sort, hash join,
641            // aggregation) write to the default disk manager's temp files
642            // instead of failing outright when the pool is exhausted.
643            let runtime_env = datafusion::execution::runtime_env::RuntimeEnvBuilder::new()
644                .with_memory_pool(Arc::new(
645                    datafusion::execution::memory_pool::FairSpillPool::new(limit),
646                ))
647                .build_arc()
648                .map_err(|e| SqlError::DataFusion {
649                    message: format!(
650                        "failed to build memory-limited DataFusion runtime \
651                         (limit {limit} bytes): {e}"
652                    ),
653                })?;
654            state_builder = state_builder.with_runtime_env(runtime_env);
655        }
656        let state = state_builder.build();
657        let context = SessionContext::new_with_state(state);
658        if let Some(catalog) = &krishiv_catalog {
659            context.register_catalog(
660                "krishiv",
661                Arc::new(DataFusionCatalogBridge::new(catalog.clone())),
662            );
663        }
664        if matches!(window_fn_registration, WindowFnRegistration::Register) {
665            window_functions::register_window_functions(&context).map_err(|e| {
666                SqlError::DataFusion {
667                    message: format!("failed to register window helper UDFs: {e}"),
668                }
669            })?;
670        }
671        Ok(Self {
672            context,
673            target_parallelism: target_partitions,
674            krishiv_catalog,
675            udf_registry: None,
676            streaming_sources,
677            streaming_registration: Arc::new(Mutex::new(())),
678            has_streaming_sources: Arc::new(AtomicBool::new(false)),
679            udf_limits: None,
680            udf_registry_version: Arc::new(AtomicU64::new(0)),
681            udf_last_synced_version: Arc::new(AtomicU64::new(u64::MAX)),
682            plan_cache: Arc::new(Mutex::new(PlanCache::new(resolve_plan_cache_max_entries()))),
683            shuffle_partitions: Arc::new(std::sync::RwLock::new(None)),
684            table_row_counts: Arc::new(std::sync::RwLock::new(HashMap::new())),
685            memory_limit_bytes,
686            #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
687            iceberg_catalogs: Arc::new(std::sync::RwLock::new(Vec::new())),
688            live_table_registry: Arc::new(live_table::LiveTableRegistry::new()),
689            incremental_view_registry: Arc::new(incremental_view::IncrementalViewRegistry::new()),
690            pipeline_registry: Arc::new(pipeline_ddl::PipelineRegistry::new()),
691            operation_registry: Arc::new(OperationRegistry::new()),
692        })
693    }
694
695    /// Build the absolute minimal engine: no catalog, no window UDFs, no memory
696    /// limit. Every step is infallible, so the return type is `Self`. Used as
697    /// the last-resort fallback in `new_with_memory_limit`.
698    fn build_absolute_minimal(target_partitions: NonZeroUsize) -> Self {
699        let streaming_sources: Arc<RwLock<std::collections::HashSet<String>>> =
700            Arc::new(RwLock::new(std::collections::HashSet::new()));
701        let dummy_state = datafusion::execution::session_state::SessionStateBuilder::new()
702            .with_default_features()
703            .build();
704        let mut table_factories = dummy_state.table_factories().clone();
705        crate::connector_table::register_connector_table_factories(
706            &mut table_factories,
707            streaming_sources.clone(),
708        );
709        let state = datafusion::execution::session_state::SessionStateBuilder::new()
710            .with_default_features()
711            .with_config(build_single_node_session_config(target_partitions))
712            .with_table_factories(table_factories)
713            .build();
714        let context = SessionContext::new_with_state(state);
715        Self {
716            context,
717            target_parallelism: target_partitions,
718            krishiv_catalog: None,
719            udf_registry: None,
720            streaming_sources,
721            streaming_registration: Arc::new(Mutex::new(())),
722            has_streaming_sources: Arc::new(AtomicBool::new(false)),
723            udf_limits: None,
724            udf_registry_version: Arc::new(AtomicU64::new(0)),
725            udf_last_synced_version: Arc::new(AtomicU64::new(u64::MAX)),
726            plan_cache: Arc::new(Mutex::new(PlanCache::new(resolve_plan_cache_max_entries()))),
727            shuffle_partitions: Arc::new(std::sync::RwLock::new(None)),
728            table_row_counts: Arc::new(std::sync::RwLock::new(HashMap::new())),
729            memory_limit_bytes: None,
730            #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
731            iceberg_catalogs: Arc::new(std::sync::RwLock::new(Vec::new())),
732            live_table_registry: Arc::new(live_table::LiveTableRegistry::new()),
733            incremental_view_registry: Arc::new(incremental_view::IncrementalViewRegistry::new()),
734            pipeline_registry: Arc::new(pipeline_ddl::PipelineRegistry::new()),
735            operation_registry: Arc::new(OperationRegistry::new()),
736        }
737    }
738
739    /// Register an unbounded continuous table, returning its typed input.
740    ///
741    /// The returned input uses a bounded channel with capacity
742    /// [`crate::streaming::CONTINUOUS_TABLE_CHANNEL_CAPACITY`]. When the
743    /// consumer (the DataFusion query plan) is slower than the producer,
744    /// `ContinuousTableInput::send(...).await` backpressures the producer,
745    /// and `ContinuousTableInput::try_send(...)` returns a resource error
746    /// rather than growing memory without limit. Use
747    /// [`register_streaming_table_with_capacity`] for a non-default
748    /// capacity.
749    pub fn register_streaming_table(
750        &self,
751        name: &str,
752        schema: arrow::datatypes::SchemaRef,
753    ) -> SqlResult<Arc<ContinuousTableInput>> {
754        let _registration = self.lock_streaming_registration()?;
755        self.validate_new_streaming_table(name, &schema)?;
756        let (table, input) = crate::streaming::create_continuous_table(schema).map_err(|e| {
757            SqlError::DataFusion {
758                message: e.to_string(),
759            }
760        })?;
761        self.register_new_streaming_provider(name, table)?;
762        self.streaming_sources
763            .write()
764            .unwrap_or_else(|e| e.into_inner())
765            .insert(name.to_string());
766        self.has_streaming_sources.store(true, Ordering::Release);
767        self.invalidate_plan_cache();
768        Ok(input)
769    }
770
771    /// Same as [`Self::register_streaming_table`] but with a caller-supplied
772    /// channel capacity. Useful for tests that want to exercise the
773    /// full/empty channel boundary without pushing
774    /// `CONTINUOUS_TABLE_CHANNEL_CAPACITY` (64) batches.
775    pub fn register_streaming_table_with_capacity(
776        &self,
777        name: &str,
778        schema: arrow::datatypes::SchemaRef,
779        capacity: usize,
780    ) -> SqlResult<Arc<ContinuousTableInput>> {
781        let _registration = self.lock_streaming_registration()?;
782        self.validate_new_streaming_table(name, &schema)?;
783        let (table, input) = crate::streaming::create_continuous_table_with_capacity(
784            schema, capacity,
785        )
786        .map_err(|e| SqlError::DataFusion {
787            message: e.to_string(),
788        })?;
789        self.register_new_streaming_provider(name, table)?;
790        self.streaming_sources
791            .write()
792            .unwrap_or_else(|e| e.into_inner())
793            .insert(name.to_string());
794        self.has_streaming_sources.store(true, Ordering::Release);
795        self.invalidate_plan_cache();
796        Ok(input)
797    }
798
799    fn lock_streaming_registration(&self) -> SqlResult<std::sync::MutexGuard<'_, ()>> {
800        self.streaming_registration
801            .lock()
802            .map_err(|error| SqlError::DataFusion {
803                message: format!("streaming table registration lock poisoned: {error}"),
804            })
805    }
806
807    fn validate_new_streaming_table(
808        &self,
809        name: &str,
810        schema: &arrow::datatypes::SchemaRef,
811    ) -> SqlResult<()> {
812        if name.trim().is_empty() {
813            return Err(SqlError::EmptyTableName);
814        }
815        if schema.fields().is_empty() {
816            return Err(SqlError::DataFusion {
817                message: "streaming table schema must contain at least one field".into(),
818            });
819        }
820        if self
821            .context
822            .table_exist(name)
823            .map_err(|error| SqlError::DataFusion {
824                message: error.to_string(),
825            })?
826        {
827            return Err(SqlError::DataFusion {
828                message: format!("table '{name}' is already registered"),
829            });
830        }
831        Ok(())
832    }
833
834    fn register_new_streaming_provider(
835        &self,
836        name: &str,
837        table: Arc<dyn datafusion::catalog::TableProvider>,
838    ) -> SqlResult<()> {
839        let previous =
840            self.context
841                .register_table(name, table)
842                .map_err(|error| SqlError::DataFusion {
843                    message: error.to_string(),
844                })?;
845        if let Some(previous) = previous {
846            self.context
847                .register_table(name, previous)
848                .map_err(|error| SqlError::DataFusion {
849                    message: format!(
850                        "table '{name}' was concurrently registered and could not be restored: \
851                         {error}"
852                    ),
853                })?;
854            return Err(SqlError::DataFusion {
855                message: format!("table '{name}' was concurrently registered"),
856            });
857        }
858        Ok(())
859    }
860
861    /// Register a live Kafka/Redpanda topic as an unbounded streaming table.
862    ///
863    /// This is the native Rust path — no Python bridge or external process
864    /// required.  Under the hood it creates an `rdkafka` consumer and wraps it
865    /// in a DataFusion `StreamingTable` so normal SQL queries (`SELECT`,
866    /// `GROUP BY`, windowed aggregations) work against the live topic.
867    ///
868    /// Equivalent SQL DDL:
869    /// ```sql
870    /// CREATE EXTERNAL TABLE <name> (<cols>) STORED AS KAFKA
871    ///   LOCATION '<topic>'
872    ///   OPTIONS ('bootstrap.servers' = '…', 'group.id' = '…');
873    /// ```
874    pub fn register_kafka_source(
875        &self,
876        table_name: impl AsRef<str>,
877        schema: arrow::datatypes::SchemaRef,
878        bootstrap_servers: impl Into<String>,
879        topic: impl Into<String>,
880        group_id: impl Into<String>,
881    ) -> SqlResult<()> {
882        let table_name = table_name.as_ref();
883        if table_name.trim().is_empty() {
884            return Err(SqlError::EmptyTableName);
885        }
886        let config = krishiv_connectors::kafka::KafkaConfig {
887            bootstrap_servers: bootstrap_servers.into(),
888            topic: topic.into(),
889            group_id: group_id.into(),
890            auto_commit_interval_ms: {
891                let profile = krishiv_common::resolve_durability_profile();
892                if krishiv_common::requires_manual_kafka_commit(profile) {
893                    None
894                } else {
895                    Some(1_000)
896                }
897            },
898            security_protocol: None,
899            ssl_ca_location: None,
900            ssl_certificate_location: None,
901            ssl_key_location: None,
902            ssl_key_password: None,
903            sasl_username: None,
904            sasl_password: None,
905            sasl_mechanisms: None,
906            enable_idempotence: None,
907            transactional_id: None,
908        };
909        let table =
910            crate::kafka_table::create_kafka_streaming_table(schema, config).map_err(|e| {
911                SqlError::DataFusion {
912                    message: e.to_string(),
913                }
914            })?;
915        if self
916            .context
917            .table_exist(table_name)
918            .map_err(SqlError::from)?
919        {
920            let _ = self
921                .context
922                .deregister_table(table_name)
923                .map_err(SqlError::from)?;
924        }
925        self.context
926            .register_table(table_name, table)
927            .map_err(|e| SqlError::DataFusion {
928                message: e.to_string(),
929            })?;
930        self.streaming_sources
931            .write()
932            .unwrap_or_else(|e| e.into_inner())
933            .insert(table_name.to_string());
934        self.has_streaming_sources.store(true, Ordering::Release);
935        self.invalidate_plan_cache();
936        Ok(())
937    }
938
939    /// Execute a SQL query and write every result row to a Kafka/Redpanda topic.
940    ///
941    /// Each row is serialised as a JSON object using the same format as
942    /// [`KafkaSink`].  The method blocks until the query stream ends and the
943    /// producer queue is flushed, then returns the total number of rows written.
944    ///
945    /// **Note**: If `sql` targets an unbounded streaming table (e.g. one
946    /// registered via [`register_kafka_source`]) this call will never return.
947    /// Use it with batch sources or add a `LIMIT` clause.
948    pub async fn sql_to_kafka(
949        &self,
950        sql: impl AsRef<str>,
951        bootstrap_servers: impl Into<String>,
952        topic: impl Into<String>,
953    ) -> SqlResult<u64> {
954        use futures::StreamExt;
955        use krishiv_connectors::Sink as _;
956        use krishiv_connectors::kafka::{KafkaConfig, KafkaSink};
957
958        let config = KafkaConfig {
959            bootstrap_servers: bootstrap_servers.into(),
960            topic: topic.into(),
961            group_id: "krishiv-sql-writer".into(),
962            auto_commit_interval_ms: None,
963            security_protocol: None,
964            ssl_ca_location: None,
965            ssl_certificate_location: None,
966            ssl_key_location: None,
967            ssl_key_password: None,
968            sasl_username: None,
969            sasl_password: None,
970            sasl_mechanisms: None,
971            enable_idempotence: None,
972            transactional_id: None,
973        };
974        let mut sink = KafkaSink::new(config).map_err(|e| SqlError::DataFusion {
975            message: e.to_string(),
976        })?;
977
978        let df = self.sql(sql.as_ref()).await?;
979        let mut stream = df.execute_stream().await?;
980        let mut total_rows = 0u64;
981
982        while let Some(result) = stream.next().await {
983            let batch = result.map_err(|e| SqlError::DataFusion {
984                message: e.to_string(),
985            })?;
986            if batch.num_rows() > 0 {
987                total_rows += batch.num_rows() as u64;
988                sink.write_batch(batch)
989                    .await
990                    .map_err(|e| SqlError::DataFusion {
991                        message: e.to_string(),
992                    })?;
993            }
994        }
995        sink.flush().await.map_err(|e| SqlError::DataFusion {
996            message: e.to_string(),
997        })?;
998        Ok(total_rows)
999    }
1000
1001    /// Configure this engine with explicit UDF resource limits (Track E).
1002    /// When set, calls to `sql()` and direct UDF syncs will use these budgets
1003    /// instead of unlimited defaults. Intended for job-specific engines.
1004    pub fn with_udf_limits(mut self, limits: krishiv_plan::udf::ResourceLimits) -> Self {
1005        self.udf_limits = Some(limits);
1006        self
1007    }
1008
1009    /// Returns `true` if `table_name` is registered as an unbounded streaming source.
1010    pub fn is_streaming_source(&self, table_name: &str) -> bool {
1011        self.streaming_sources
1012            .read()
1013            .unwrap_or_else(|e| e.into_inner())
1014            .contains(table_name)
1015    }
1016
1017    /// Register a table name as a streaming source without creating a live connector.
1018    ///
1019    /// This is the test-safe alternative to [`register_kafka_source`]: it marks
1020    /// `table_name` in the `streaming_sources` set so that `is_streaming_query`
1021    /// returns `true` for queries that reference it, without constructing any
1022    /// broker connection. Useful for unit tests where a live Kafka broker is not
1023    /// available and rdkafka's log subsystem is not initialised.
1024    /// Returns [`SqlError::EmptyTableName`] if `table_name` is blank.
1025    pub fn register_streaming_source_name(&self, table_name: impl Into<String>) -> SqlResult<()> {
1026        let name: String = table_name.into();
1027        if name.trim().is_empty() {
1028            return Err(SqlError::EmptyTableName);
1029        }
1030        self.streaming_sources
1031            .write()
1032            .unwrap_or_else(|e| e.into_inner())
1033            .insert(name);
1034        self.has_streaming_sources.store(true, Ordering::Release);
1035        self.invalidate_plan_cache();
1036        Ok(())
1037    }
1038
1039    /// Remove a streaming source registration.
1040    ///
1041    /// Deregisters the table from DataFusion and removes it from the streaming-
1042    /// sources set. Invalidates the plan cache. Idempotent — deregistering a
1043    /// name that was never registered is not an error.
1044    pub fn deregister_streaming_source(&self, name: &str) -> SqlResult<()> {
1045        if name.trim().is_empty() {
1046            return Err(SqlError::EmptyTableName);
1047        }
1048        // Idempotent: ignore the Option return (None when table wasn't registered).
1049        let _ = self
1050            .context
1051            .deregister_table(name)
1052            .map_err(SqlError::from)?;
1053        {
1054            let mut sources = self
1055                .streaming_sources
1056                .write()
1057                .unwrap_or_else(|e| e.into_inner());
1058            sources.remove(name);
1059            if sources.is_empty() {
1060                self.has_streaming_sources.store(false, Ordering::Release);
1061            }
1062            // Invalidate while still holding the write lock so there is no window
1063            // between source removal and cache invalidation where a concurrent
1064            // is_streaming_query returns false but serves a stale cached plan (N5).
1065            self.invalidate_plan_cache();
1066        }
1067        Ok(())
1068    }
1069
1070    /// Shared live-table registry for `CREATE LIVE TABLE` DDL.
1071    pub fn live_table_registry(&self) -> &Arc<live_table::LiveTableRegistry> {
1072        &self.live_table_registry
1073    }
1074
1075    /// Shared incremental-view registry for `CREATE INCREMENTAL VIEW` DDL.
1076    pub fn incremental_view_registry(&self) -> &Arc<incremental_view::IncrementalViewRegistry> {
1077        &self.incremental_view_registry
1078    }
1079
1080    /// Shared pipeline registry for `CREATE SOURCE` / `CREATE SINK` DDL.
1081    pub fn pipeline_registry(&self) -> &Arc<pipeline_ddl::PipelineRegistry> {
1082        &self.pipeline_registry
1083    }
1084
1085    /// Shared operation registry for cancellation and progress reporting.
1086    pub fn operation_registry(&self) -> &Arc<OperationRegistry> {
1087        &self.operation_registry
1088    }
1089
1090    /// Drop a named table from the session context.
1091    ///
1092    /// Idempotent — dropping a name that was never registered is not an error.
1093    pub fn deregister_table(&self, name: &str) -> SqlResult<()> {
1094        if name.trim().is_empty() {
1095            return Err(SqlError::EmptyTableName);
1096        }
1097        let _ = self
1098            .context
1099            .deregister_table(name)
1100            .map_err(SqlError::from)?;
1101        self.invalidate_plan_cache();
1102        Ok(())
1103    }
1104
1105    /// Register a table UDF backed by a Rust closure.
1106    ///
1107    /// The closure receives literal arguments passed by the SQL caller as
1108    /// `ScalarValue` values and returns an Arrow `RecordBatch`. Non-literal
1109    /// arguments are rejected because they cannot be evaluated safely at the
1110    /// synchronous DataFusion table-function boundary. `schema` describes the
1111    /// output columns.
1112    ///
1113    /// # Example
1114    /// ```ignore
1115    /// engine.register_table_udf_fn(
1116    ///     "generate_ints",
1117    ///     Schema::new(vec![Field::new("n", DataType::Int64, false)]),
1118    ///     |args| {
1119    ///         let count = match args.first() {
1120    ///             Some(ScalarValue::Int64(n)) => *n,
1121    ///             _ => 10,
1122    ///         };
1123    ///         let arr = Int64Array::from((0..count).collect::<Vec<_>>());
1124    ///         Ok(RecordBatch::try_from_iter([("n", Arc::new(arr) as _)])?)
1125    ///     },
1126    /// )?;
1127    /// ```
1128    pub fn register_table_udf_fn(
1129        &self,
1130        name: impl Into<String>,
1131        schema: arrow::datatypes::Schema,
1132        f: impl Fn(
1133            &[krishiv_plan::udf::ScalarValue],
1134        ) -> Result<arrow::record_batch::RecordBatch, krishiv_plan::udf::UdfError>
1135        + Send
1136        + Sync
1137        + 'static,
1138    ) -> SqlResult<()> {
1139        let udf =
1140            create_function_ddl::ClosureTableUdf::try_new(name, schema, std::sync::Arc::new(f))
1141                .map_err(|error| SqlError::InvalidTableFunction {
1142                    message: error.to_string(),
1143                })?;
1144        if let Some(registry) = &self.udf_registry {
1145            let mut guard = registry.write().map_err(|e| SqlError::DataFusion {
1146                message: e.to_string(),
1147            })?;
1148            guard.register_table(std::sync::Arc::new(udf.clone()));
1149        }
1150        udf::register_single_table_udf(&self.context, std::sync::Arc::new(udf))
1151            .map_err(SqlError::from)
1152    }
1153
1154    /// Returns `true` if any table referenced in `sql` is a registered streaming source.
1155    pub fn is_streaming_query(&self, sql: &str) -> SqlResult<bool> {
1156        // Fast-path: avoid the RwLock acquire and SQL parse for the common case
1157        // where no streaming sources have ever been registered (pure batch engines).
1158        if !self.has_streaming_sources.load(Ordering::Acquire) {
1159            return Ok(false);
1160        }
1161        let sources = self
1162            .streaming_sources
1163            .read()
1164            .unwrap_or_else(|e| e.into_inner());
1165        if sources.is_empty() {
1166            return Ok(false);
1167        }
1168        let dialect = GenericDialect {};
1169        let statements = Parser::parse_sql(&dialect, sql).map_err(|e| SqlError::DataFusion {
1170            message: e.to_string(),
1171        })?;
1172        for stmt in &statements {
1173            let mut is_streaming = false;
1174            let _ = visit_relations(stmt, |relation| {
1175                // relation.to_string() yields the fully-qualified name (e.g. "schema.table").
1176                // Extract the unqualified table name (last segment after dot).
1177                let full = relation.to_string();
1178                let table_name = full.split('.').next_back().unwrap_or(&full);
1179                if sources.contains(table_name) {
1180                    is_streaming = true;
1181                    return ControlFlow::Break(());
1182                }
1183                ControlFlow::Continue(())
1184            });
1185            if is_streaming {
1186                return Ok(true);
1187            }
1188        }
1189        Ok(false)
1190    }
1191
1192    /// Shared Krishiv catalog backing this engine, if configured.
1193    pub fn krishiv_catalog(&self) -> Option<&Arc<RwLock<InMemoryCatalog>>> {
1194        self.krishiv_catalog.as_ref()
1195    }
1196
1197    /// Register an Iceberg [`KrishivCatalog`] as a DataFusion catalog provider.
1198    ///
1199    /// Tables in the catalog are resolved automatically by DataFusion when SQL
1200    /// queries reference `<catalog_name>.<namespace>.<table>`. The bridge uses
1201    /// `plan_files()` to enumerate Parquet files and wraps them in a
1202    /// `ListingTable`, giving DataFusion native projection/filter pushdown.
1203    ///
1204    /// Multiple catalogs can be registered under different names.
1205    #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
1206    #[must_use]
1207    pub fn with_iceberg_catalog(
1208        self,
1209        catalog: std::sync::Arc<catalog::unified::KrishivCatalog>,
1210        catalog_name: impl Into<String>,
1211    ) -> Self {
1212        let catalog_name = catalog_name.into();
1213        let bridge = catalog::iceberg_catalog_bridge::IcebergCatalogBridge::new(
1214            Arc::clone(&catalog),
1215            catalog_name.clone(),
1216        );
1217        self.context
1218            .register_catalog(catalog_name.clone(), Arc::new(bridge));
1219        self.iceberg_catalogs
1220            .write()
1221            .unwrap_or_else(|e| e.into_inner())
1222            .push((catalog, catalog_name));
1223        self
1224    }
1225
1226    /// Share a session UDF registry so scalar UDFs are visible in SQL.
1227    #[must_use]
1228    pub fn with_udf_registry(
1229        mut self,
1230        registry: std::sync::Arc<std::sync::RwLock<krishiv_plan::udf::UdfRegistry>>,
1231    ) -> Self {
1232        self.udf_registry = Some(registry);
1233        // Mark UDFs as dirty so the first sql() call syncs them.
1234        self.bump_udf_version();
1235        self
1236    }
1237
1238    /// Increment the UDF version counter to signal that `sync_all_udfs()` is
1239    /// needed on the next `sql()` call.
1240    pub(crate) fn bump_udf_version(&self) {
1241        self.udf_registry_version.fetch_add(1, Ordering::Release);
1242    }
1243
1244    /// Invalidate the plan cache after any schema change. Call this whenever a
1245    /// table is registered, replaced, or deregistered. Full invalidation is
1246    /// simpler and safer than per-table tracking: the cache refills quickly on
1247    /// the next few queries.
1248    fn invalidate_plan_cache(&self) {
1249        match self.plan_cache.lock() {
1250            Ok(mut cache) => cache.clear(),
1251            Err(poisoned) => poisoned.into_inner().clear(),
1252        }
1253    }
1254
1255    /// Expose cache invalidation for tests and external callers that register
1256    /// tables through a different path.
1257    pub fn clear_plan_cache(&self) {
1258        self.invalidate_plan_cache();
1259    }
1260
1261    /// Register all scalar UDFs from the attached registry with DataFusion.
1262    /// Uses unlimited defaults (backward compat).
1263    pub async fn sync_scalar_udfs(&self) -> SqlResult<()> {
1264        let Some(registry) = &self.udf_registry else {
1265            return Ok(());
1266        };
1267        let guard = registry.read().map_err(|e| SqlError::DataFusion {
1268            message: e.to_string(),
1269        })?;
1270        let limits = self.udf_limits.clone().unwrap_or_default();
1271        udf::sync_scalar_udfs_with_limits(&self.context, &guard, limits).map_err(|e| {
1272            SqlError::DataFusion {
1273                message: e.to_string(),
1274            }
1275        })
1276    }
1277
1278    /// Register scalar UDFs with explicit ResourceLimits for sandbox enforcement.
1279    /// Callers that have a job context (scheduler, runner, api session for a job)
1280    /// should use this and pass limits derived from the JobSpec (memory + time cap).
1281    /// This is the concrete Track E seam from job limits to UDF execution.
1282    pub async fn sync_scalar_udfs_with_limits(
1283        &self,
1284        limits: krishiv_plan::udf::ResourceLimits,
1285    ) -> SqlResult<()> {
1286        self.sync_scalar_udfs_with_limits_for_profile(
1287            limits,
1288            krishiv_common::resolve_durability_profile(),
1289        )
1290        .await
1291    }
1292
1293    /// Register scalar UDFs using a caller-resolved durability profile.
1294    pub async fn sync_scalar_udfs_with_limits_for_profile(
1295        &self,
1296        limits: krishiv_plan::udf::ResourceLimits,
1297        profile: krishiv_common::DurabilityProfile,
1298    ) -> SqlResult<()> {
1299        self.sync_scalar_udfs_with_limits_for_policy(
1300            limits,
1301            krishiv_common::NativeScalarUdfPolicy::resolve(profile),
1302        )
1303        .await
1304    }
1305
1306    /// Register scalar UDFs using a caller-snapshotted policy decision.
1307    pub async fn sync_scalar_udfs_with_limits_for_policy(
1308        &self,
1309        limits: krishiv_plan::udf::ResourceLimits,
1310        policy: krishiv_common::NativeScalarUdfPolicy,
1311    ) -> SqlResult<()> {
1312        let Some(registry) = &self.udf_registry else {
1313            return Ok(());
1314        };
1315        let guard = registry.read().map_err(|e| SqlError::DataFusion {
1316            message: e.to_string(),
1317        })?;
1318        udf::sync_scalar_udfs_with_limits_for_policy(&self.context, &guard, limits, policy).map_err(
1319            |e| SqlError::DataFusion {
1320                message: e.to_string(),
1321            },
1322        )
1323    }
1324
1325    /// Register aggregate UDFs from the attached registry (P1-21).
1326    pub async fn sync_aggregate_udfs(&self) -> SqlResult<()> {
1327        let Some(registry) = &self.udf_registry else {
1328            return Ok(());
1329        };
1330        let guard = registry.read().map_err(|e| SqlError::DataFusion {
1331            message: e.to_string(),
1332        })?;
1333        udf::sync_aggregate_udfs(&self.context, &guard).map_err(|e| SqlError::DataFusion {
1334            message: e.to_string(),
1335        })
1336    }
1337
1338    /// Register table UDFs from the attached registry (P1-21).
1339    pub async fn sync_table_udfs(&self) -> SqlResult<()> {
1340        let Some(registry) = &self.udf_registry else {
1341            return Ok(());
1342        };
1343        let guard = registry.read().map_err(|e| SqlError::DataFusion {
1344            message: e.to_string(),
1345        })?;
1346        udf::sync_table_udfs(&self.context, &guard).map_err(|e| SqlError::DataFusion {
1347            message: e.to_string(),
1348        })
1349    }
1350
1351    /// Sync all UDF categories, respecting any limits configured on this engine (Track E).
1352    pub async fn sync_all_udfs(&self) -> SqlResult<()> {
1353        self.sync_scalar_udfs().await?;
1354        self.sync_aggregate_udfs().await?;
1355        self.sync_table_udfs().await?;
1356        Ok(())
1357    }
1358
1359    /// Register a local Parquet path as a table.
1360    pub async fn register_parquet(
1361        &self,
1362        table_name: impl AsRef<str>,
1363        path: impl AsRef<Path>,
1364    ) -> SqlResult<()> {
1365        let table_name = table_name.as_ref();
1366        if table_name.trim().is_empty() {
1367            return Err(SqlError::EmptyTableName);
1368        }
1369
1370        let path = path.as_ref().to_string_lossy().into_owned();
1371
1372        // Register an S3 ObjectStore when the path is an s3:// URL so DataFusion
1373        // can read remote Parquet files transparently.
1374        if path.starts_with("s3://") {
1375            let url = url::Url::parse(&path).map_err(|e| SqlError::DataFusion {
1376                message: format!("invalid s3 url {path}: {e}"),
1377            })?;
1378            let bucket = url.host_str().unwrap_or_default();
1379            let store_url =
1380                url::Url::parse(&format!("s3://{bucket}")).map_err(|e| SqlError::DataFusion {
1381                    message: format!("invalid s3 bucket url: {e}"),
1382                })?;
1383            let store = AmazonS3Builder::from_env()
1384                .with_bucket_name(bucket)
1385                .build()
1386                .map_err(|e| SqlError::DataFusion {
1387                    message: format!("s3 store init: {e}"),
1388                })?;
1389            self.context
1390                .register_object_store(&store_url, Arc::new(store));
1391        }
1392
1393        if self
1394            .context
1395            .table_exist(table_name)
1396            .map_err(SqlError::from)?
1397        {
1398            let _ = self
1399                .context
1400                .deregister_table(table_name)
1401                .map_err(SqlError::from)?;
1402        }
1403        self.context
1404            .register_parquet(table_name, path, ParquetReadOptions::default())
1405            .await?;
1406        // Extract estimated row count from table provider statistics.
1407        if let Ok(provider) = self.context.table_provider(table_name).await
1408            && let Some(stats) = provider.statistics()
1409            && let Some(n) = stats.num_rows.get_value()
1410            && let Ok(mut counts) = self.table_row_counts.write()
1411        {
1412            counts.insert(table_name.to_string(), *n as u64);
1413        }
1414        self.invalidate_plan_cache();
1415        Ok(())
1416    }
1417
1418    /// Create a DataFrame by reading a local Parquet path directly.
1419    pub async fn read_parquet(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame> {
1420        let path = path.as_ref().to_string_lossy().into_owned();
1421        let dataframe = self
1422            .context
1423            .read_parquet(path, ParquetReadOptions::default())
1424            .await?;
1425        Ok(self.make_sql_df("parquet-read", dataframe))
1426    }
1427
1428    /// Register an in-memory table from Arrow record batches.
1429    ///
1430    /// The schema is inferred from the first batch. An empty `batches` slice
1431    /// registers a table with no rows using the provided schema if the batches
1432    /// are non-empty, or is a no-op if empty.
1433    pub async fn register_record_batches(
1434        &self,
1435        table_name: impl AsRef<str>,
1436        batches: Vec<RecordBatch>,
1437    ) -> SqlResult<()> {
1438        use std::sync::Arc;
1439        let table_name = table_name.as_ref();
1440        if table_name.trim().is_empty() {
1441            return Err(SqlError::EmptyTableName);
1442        }
1443        if batches.is_empty() {
1444            return Ok(());
1445        }
1446        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1447        let schema = batches
1448            .first()
1449            .ok_or_else(|| SqlError::DataFusion {
1450                message: "empty batch list".into(),
1451            })?
1452            .schema();
1453        let mem_table =
1454            datafusion::datasource::MemTable::try_new(schema, vec![batches]).map_err(|e| {
1455                SqlError::DataFusion {
1456                    message: e.to_string(),
1457                }
1458            })?;
1459        if self
1460            .context
1461            .table_exist(table_name)
1462            .map_err(SqlError::from)?
1463        {
1464            let _ = self
1465                .context
1466                .deregister_table(table_name)
1467                .map_err(SqlError::from)?;
1468        }
1469        self.context
1470            .register_table(table_name, Arc::new(mem_table))
1471            .map_err(|e| SqlError::DataFusion {
1472                message: e.to_string(),
1473            })?;
1474        if total_rows > 0
1475            && let Ok(mut counts) = self.table_row_counts.write()
1476        {
1477            counts.insert(table_name.to_string(), total_rows as u64);
1478        }
1479        self.invalidate_plan_cache();
1480        Ok(())
1481    }
1482
1483    /// Create a DataFrame by reading a local Parquet path with typed options.
1484    pub async fn read_parquet_with_options(
1485        &self,
1486        path: impl AsRef<Path>,
1487        opts: &ParquetReaderOptions,
1488    ) -> SqlResult<SqlDataFrame> {
1489        let path = path.as_ref().to_string_lossy().into_owned();
1490        let mut options = datafusion::prelude::ParquetReadOptions::default();
1491        if opts.batch_size.is_some() {
1492            options = options.parquet_pruning(true);
1493        }
1494        // NOTE: `batch_size` is not yet propagated here because DataFusion's
1495        // ParquetReadOptions has no batch_size field — it lives on SessionConfig.
1496        // Callers should set batch_size on the SqlEngine's session config before
1497        // calling this method (via `SessionContext::new_with_state` with a config
1498        // that has `execution.batch_size` set).
1499        let dataframe = self.context.read_parquet(path, options).await?;
1500        Ok(self.make_sql_df("parquet-read", dataframe))
1501    }
1502
1503    /// Create a DataFrame by reading a local CSV path directly.
1504    pub async fn read_csv(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame> {
1505        self.read_csv_with_options(path, &CsvReaderOptions::default())
1506            .await
1507    }
1508
1509    /// Create a DataFrame by reading a local CSV path with typed options.
1510    pub async fn read_csv_with_options(
1511        &self,
1512        path: impl AsRef<Path>,
1513        opts: &CsvReaderOptions,
1514    ) -> SqlResult<SqlDataFrame> {
1515        let path = path.as_ref().to_string_lossy().into_owned();
1516        let mut options = datafusion::prelude::CsvReadOptions::new();
1517        if let Some(delim) = opts.delimiter {
1518            options = options.delimiter(delim as u8);
1519        }
1520        if let Some(has_header) = opts.has_header {
1521            options = options.has_header(has_header);
1522        }
1523        let dataframe = self.context.read_csv(path, options).await?;
1524        Ok(self.make_sql_df("csv-read", dataframe))
1525    }
1526
1527    /// Create a DataFrame by reading a local JSON/NDJSON path directly.
1528    pub async fn read_json(&self, path: impl AsRef<Path>) -> SqlResult<SqlDataFrame> {
1529        let path = path.as_ref().to_string_lossy().into_owned();
1530        let dataframe = self
1531            .context
1532            .read_json(path, datafusion::prelude::JsonReadOptions::default())
1533            .await?;
1534        Ok(self.make_sql_df("json-read", dataframe))
1535    }
1536
1537    /// Read a local Delta table directory into a DataFrame.
1538    pub async fn read_delta(
1539        &self,
1540        path: impl AsRef<str>,
1541        version: Option<i64>,
1542    ) -> SqlResult<SqlDataFrame> {
1543        let path = path.as_ref();
1544        let base = path.replace(['/', '.', '-'], "_");
1545        let table = match version {
1546            Some(v) => format!("delta_{base}_v{v}"),
1547            None => format!("delta_{base}"),
1548        };
1549        lakehouse::register_delta_uri(&self.context, &table, path, version).await?;
1550        self.sql(format!("SELECT * FROM {table}")).await
1551    }
1552
1553    /// Read a Hudi table directory.
1554    pub async fn read_hudi(
1555        &self,
1556        path: impl AsRef<str>,
1557        query_type: krishiv_connectors::lakehouse::HudiQueryType,
1558        begin_instant: Option<&str>,
1559    ) -> SqlResult<SqlDataFrame> {
1560        let path = path.as_ref();
1561        let table = format!("hudi_{}", path.replace(['/', '.', '-'], "_"));
1562        lakehouse::register_hudi_uri(&self.context, &table, path, query_type, begin_instant)
1563            .await?;
1564        self.sql(format!("SELECT * FROM {table}")).await
1565    }
1566
1567    /// Plan a SQL query with DataFusion.
1568    pub async fn sql(&self, query: impl AsRef<str>) -> SqlResult<SqlDataFrame> {
1569        let query = query.as_ref();
1570        if query.trim().is_empty() {
1571            return Err(SqlError::EmptyQuery);
1572        }
1573
1574        // Lazy UDF sync: only re-sync when the registry has changed since the
1575        // last sync. Avoids 3 RwLock reads per query when no UDFs are registered
1576        // or when the UDF set hasn't changed.
1577        {
1578            let current = self.udf_registry_version.load(Ordering::Acquire);
1579            let last = self.udf_last_synced_version.load(Ordering::Relaxed);
1580            if current != last {
1581                self.sync_all_udfs().await?;
1582                self.udf_last_synced_version
1583                    .store(current, Ordering::Release);
1584            }
1585        }
1586
1587        // ── Intercept DESCRIBE / SHOW COLUMNS / EXPLAIN ──────────────────────
1588        if let Some(stmt) = introspection_sql::parse_introspection_statement(query)? {
1589            return match stmt {
1590                introspection_sql::IntrospectionStatement::Describe { table } => {
1591                    let batch = introspection_sql::describe_table(&self.context, &table).await?;
1592                    let describe_table_name = next_ephemeral_name("describe_result");
1593                    lakehouse::register_scan_batches(
1594                        &self.context,
1595                        &describe_table_name,
1596                        vec![batch],
1597                    )
1598                    .await?;
1599                    let dataframe = self
1600                        .context
1601                        .sql(&format!("SELECT * FROM {describe_table_name}"))
1602                        .await?;
1603                    Ok(self.attach_query_metadata(self.make_sql_df("describe", dataframe), query))
1604                }
1605                introspection_sql::IntrospectionStatement::Explain { mode, query: inner } => {
1606                    let text = introspection_sql::explain_query(&inner, mode)?;
1607                    let batch = introspection_sql::explain_result_batch(&text)?;
1608                    let explain_table = next_ephemeral_name("explain_result");
1609                    lakehouse::register_scan_batches(&self.context, &explain_table, vec![batch])
1610                        .await?;
1611                    let dataframe = self
1612                        .context
1613                        .sql(&format!("SELECT * FROM {explain_table}"))
1614                        .await?;
1615                    Ok(self.attach_query_metadata(self.make_sql_df("explain", dataframe), query))
1616                }
1617            };
1618        }
1619
1620        // ── Intercept CREATE / REFRESH / DROP LIVE TABLE ─────────────────────
1621        if live_table::execute_live_table_ddl(&self.live_table_registry, query)?.is_some() {
1622            let empty = self.context.sql("SELECT 1 WHERE FALSE").await?;
1623            return Ok(self.attach_query_metadata(self.make_sql_df("live-table-ddl", empty), query));
1624        }
1625
1626        // ── Intercept CREATE/DECLARE/REFRESH/DROP INCREMENTAL VIEW ───────────
1627        if incremental_view::execute_incremental_view_ddl(&self.incremental_view_registry, query)?
1628            .is_some()
1629        {
1630            let empty = self.context.sql("SELECT 1 WHERE FALSE").await?;
1631            return Ok(
1632                self.attach_query_metadata(self.make_sql_df("incremental-view-ddl", empty), query)
1633            );
1634        }
1635
1636        // ── Intercept CREATE/DROP SOURCE / SINK (pipeline DDL) ───────────────
1637        // `START PIPELINE` is NOT handled here — it is executed by the
1638        // `krishiv-api` session, which can reach `Session::pipeline()`.
1639        if pipeline_ddl::execute_pipeline_ddl(&self.pipeline_registry, query)?.is_some() {
1640            let empty = self.context.sql("SELECT 1 WHERE FALSE").await?;
1641            return Ok(self.attach_query_metadata(self.make_sql_df("pipeline-ddl", empty), query));
1642        }
1643
1644        // ── Intercept SET shuffle.partitions = N ─────────────────────────────
1645        // Krishiv-specific session config; DataFusion does not know about it.
1646        let trimmed = query.trim();
1647        if trimmed
1648            .to_ascii_uppercase()
1649            .starts_with("SET SHUFFLE.PARTITIONS")
1650        {
1651            let value = trimmed.split('=').nth(1).map(|s| s.trim()).unwrap_or("");
1652            match value.parse::<u32>() {
1653                Ok(n) if n > 0 => {
1654                    {
1655                        let mut guard =
1656                            self.shuffle_partitions
1657                                .write()
1658                                .map_err(|e| SqlError::DataFusion {
1659                                    message: e.to_string(),
1660                                })?;
1661                        *guard = Some(n);
1662                    }
1663                    let empty = self.context.sql("SELECT 1 WHERE FALSE").await?;
1664                    return Ok(self.make_sql_df("set-shuffle-partitions", empty));
1665                }
1666                Ok(_) => {
1667                    {
1668                        let mut guard =
1669                            self.shuffle_partitions
1670                                .write()
1671                                .map_err(|e| SqlError::DataFusion {
1672                                    message: e.to_string(),
1673                                })?;
1674                        *guard = None;
1675                    }
1676                    let empty = self.context.sql("SELECT 1 WHERE FALSE").await?;
1677                    return Ok(self.make_sql_df("set-shuffle-partitions", empty));
1678                }
1679                Err(_) => {
1680                    return Err(SqlError::DataFusion {
1681                        message: format!(
1682                            "invalid shuffle.partitions value '{value}'; expected a positive integer"
1683                        ),
1684                    });
1685                }
1686            }
1687        }
1688
1689        // ── Intercept CREATE FUNCTION … RETURNS TABLE ────────────────────────
1690        // DataFusion does not understand this extended DDL syntax. Parse and
1691        // register only executable LANGUAGE SQL definitions; unsupported
1692        // languages fail before any registry mutation.
1693        if create_function_ddl::is_create_function_returns_table(query) {
1694            let ddl = create_function_ddl::parse_create_function(query)
1695                .map_err(|message| SqlError::InvalidTableFunction { message })?;
1696            if ddl.language.as_deref() != Some("sql") {
1697                return Err(SqlError::Unsupported {
1698                    feature: format!(
1699                        "CREATE FUNCTION '{}' uses language {:?}; only LANGUAGE SQL AS '...' \
1700                         table functions are executable",
1701                        ddl.function_name, ddl.language
1702                    ),
1703                });
1704            }
1705            let body = ddl
1706                .body
1707                .as_deref()
1708                .filter(|body| !body.trim().is_empty())
1709                .ok_or_else(|| SqlError::InvalidTableFunction {
1710                    message: format!(
1711                        "SQL table function '{}' requires a non-empty AS body",
1712                        ddl.function_name
1713                    ),
1714                })?;
1715            let fields: Vec<_> = ddl
1716                .return_columns
1717                .iter()
1718                .map(|column| {
1719                    arrow::datatypes::Field::new(&column.name, column.data_type.clone(), true)
1720                })
1721                .collect();
1722            let schema = arrow::datatypes::Schema::new(fields);
1723            let udf: std::sync::Arc<dyn krishiv_plan::udf::TableUdf> = std::sync::Arc::new(
1724                create_function_ddl::SqlBodyTableUdf::try_new(
1725                    &ddl.function_name,
1726                    schema,
1727                    body,
1728                    ddl.arguments.len(),
1729                    std::sync::Arc::new(self.context.clone()),
1730                )
1731                .map_err(|error| SqlError::InvalidTableFunction {
1732                    message: error.to_string(),
1733                })?,
1734            );
1735            if let Some(registry) = &self.udf_registry {
1736                let mut guard = registry.write().map_err(|e| SqlError::DataFusion {
1737                    message: e.to_string(),
1738                })?;
1739                guard.register_table(std::sync::Arc::clone(&udf));
1740            }
1741            udf::register_single_table_udf(&self.context, std::sync::Arc::clone(&udf))
1742                .map_err(SqlError::from)?;
1743            let empty = self.context.sql("SELECT 1 WHERE FALSE").await?;
1744            return Ok(
1745                self.attach_query_metadata(self.make_sql_df("create-function", empty), query)
1746            );
1747        }
1748
1749        if query
1750            .trim_start()
1751            .to_ascii_uppercase()
1752            .starts_with("MERGE INTO")
1753        {
1754            let batches = lakehouse::execute_merge_sql(&self.context, query).await?;
1755            let merge_table = next_ephemeral_name("merge_result");
1756            lakehouse::register_scan_batches(&self.context, &merge_table, batches).await?;
1757            let dataframe = self
1758                .context
1759                .sql(&format!("SELECT * FROM {merge_table}"))
1760                .await?;
1761            return Ok(self.attach_query_metadata(self.make_sql_df("merge", dataframe), query));
1762        }
1763
1764        // ── Intercept CALL system.<proc> ──────────────────────────────────────
1765        // Route Iceberg maintenance procedures to registered KrishivCatalogs.
1766        #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
1767        if trimmed.to_ascii_uppercase().starts_with("CALL SYSTEM.") {
1768            let result = self.dispatch_call_system(trimmed).await?;
1769            let call_table = next_ephemeral_name("call_result");
1770            lakehouse::register_scan_batches(&self.context, &call_table, vec![result]).await?;
1771            let dataframe = self
1772                .context
1773                .sql(&format!("SELECT * FROM {call_table}"))
1774                .await?;
1775            return Ok(self.attach_query_metadata(self.make_sql_df("call", dataframe), query));
1776        }
1777
1778        // ── Intercept DELETE FROM <iceberg-table> [WHERE …] ──────────────────
1779        // Route to copy-on-write iceberg_delete_where when the table is tracked
1780        // by a registered KrishivCatalog. Falls through to DataFusion otherwise.
1781        #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
1782        if trimmed.to_ascii_uppercase().starts_with("DELETE FROM ") {
1783            if let Some((table_ref, predicate)) = parse_dml_delete(trimmed) {
1784                if let Some((iceberg_catalog, table_ident)) = self.resolve_iceberg_table(&table_ref)
1785                {
1786                    use arrow::array::{ArrayRef, Int64Array};
1787                    use arrow::datatypes::{DataType, Field, Schema};
1788                    let (deleted, _) = krishiv_connectors::lakehouse::dml::iceberg_delete_where(
1789                        iceberg_catalog,
1790                        &table_ident,
1791                        &predicate,
1792                        &self.context,
1793                    )
1794                    .await
1795                    .map_err(|e| SqlError::DataFusion {
1796                        message: e.to_string(),
1797                    })?;
1798                    let schema = Arc::new(Schema::new(vec![Field::new(
1799                        "deleted_rows",
1800                        DataType::Int64,
1801                        false,
1802                    )]));
1803                    let array: ArrayRef = Arc::new(Int64Array::from(vec![deleted as i64]));
1804                    let batch = RecordBatch::try_new(schema, vec![array]).map_err(|e| {
1805                        SqlError::DataFusion {
1806                            message: e.to_string(),
1807                        }
1808                    })?;
1809                    let res_table = next_ephemeral_name("delete_result");
1810                    lakehouse::register_scan_batches(&self.context, &res_table, vec![batch])
1811                        .await?;
1812                    let dataframe = self
1813                        .context
1814                        .sql(&format!("SELECT * FROM {res_table}"))
1815                        .await?;
1816                    return Ok(
1817                        self.attach_query_metadata(self.make_sql_df("delete", dataframe), query)
1818                    );
1819                }
1820            }
1821        }
1822
1823        // ── Intercept UPDATE <iceberg-table> SET … [WHERE …] ─────────────────
1824        #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
1825        if trimmed.to_ascii_uppercase().starts_with("UPDATE ") {
1826            if let Some(parsed) = parse_dml_update(trimmed) {
1827                if let Some((iceberg_catalog, table_ident)) =
1828                    self.resolve_iceberg_table(&parsed.table_ref)
1829                {
1830                    use arrow::array::{ArrayRef, Int64Array};
1831                    use arrow::datatypes::{DataType, Field, Schema};
1832                    let borrowed: Vec<(&str, &str)> = parsed
1833                        .assignments
1834                        .iter()
1835                        .map(|(c, e)| (c.as_str(), e.as_str()))
1836                        .collect();
1837                    let pred = parsed.predicate.as_deref();
1838                    let (updated, _) = krishiv_connectors::lakehouse::dml::iceberg_update_where(
1839                        iceberg_catalog,
1840                        &table_ident,
1841                        &borrowed,
1842                        pred,
1843                        &self.context,
1844                    )
1845                    .await
1846                    .map_err(|e| SqlError::DataFusion {
1847                        message: e.to_string(),
1848                    })?;
1849                    let schema = Arc::new(Schema::new(vec![Field::new(
1850                        "updated_rows",
1851                        DataType::Int64,
1852                        false,
1853                    )]));
1854                    let array: ArrayRef = Arc::new(Int64Array::from(vec![updated as i64]));
1855                    let batch = RecordBatch::try_new(schema, vec![array]).map_err(|e| {
1856                        SqlError::DataFusion {
1857                            message: e.to_string(),
1858                        }
1859                    })?;
1860                    let res_table = next_ephemeral_name("update_result");
1861                    lakehouse::register_scan_batches(&self.context, &res_table, vec![batch])
1862                        .await?;
1863                    let dataframe = self
1864                        .context
1865                        .sql(&format!("SELECT * FROM {res_table}"))
1866                        .await?;
1867                    return Ok(
1868                        self.attach_query_metadata(self.make_sql_df("update", dataframe), query)
1869                    );
1870                }
1871            }
1872        }
1873
1874        // ── Intercept MATCH_RECOGNIZE ─────────────────────────────────────────
1875        // DataFusion does not parse MATCH_RECOGNIZE. Route it through the CEP
1876        // path: parse → run PatternMatcher on the source table → return results.
1877        if query.to_ascii_uppercase().contains(" MATCH_RECOGNIZE ")
1878            && let Some(stmt) = cep_sql::parse_match_recognize(query)?
1879        {
1880            let is_streaming = self.is_streaming_source(&stmt.source_table);
1881            // For streaming sources collect a bounded window of recent events
1882            // (capped at the configured limit) so the query terminates. The
1883            // cap is configurable through `KRISHIV_MATCH_RECOGNIZE_STREAMING_LIMIT`
1884            // (default 100_000) so users can raise it for high-rate streams
1885            // or lower it to bound memory on small executors. The truncation
1886            // is logged at warn level because the result is no longer a
1887            // complete match over the unbounded stream.
1888            let streaming_limit = streaming_match_recognize_limit_from_env();
1889            let source_sql = if is_streaming {
1890                format!(
1891                    "SELECT * FROM {} LIMIT {}",
1892                    stmt.source_table, streaming_limit
1893                )
1894            } else {
1895                format!("SELECT * FROM {}", stmt.source_table)
1896            };
1897            let source_df = self.context.sql(&source_sql).await?;
1898            let source_batches = source_df.collect().await?;
1899            if is_streaming {
1900                tracing::warn!(
1901                    source = %stmt.source_table,
1902                    limit = streaming_limit,
1903                    collected_rows = source_batches.iter().map(|b| b.num_rows()).sum::<usize>(),
1904                    "MATCH_RECOGNIZE executed against a streaming source under \
1905                     bounded materialisation; results only cover the first {0} rows \
1906                     of the source. Set KRISHIV_MATCH_RECOGNIZE_STREAMING_LIMIT to a \
1907                     larger value if your executor has the memory budget.",
1908                    streaming_limit
1909                );
1910            }
1911            let results = cep_sql::execute_match_recognize(stmt, &source_batches)?;
1912            let cep_table = next_ephemeral_name("cep_result");
1913            lakehouse::register_scan_batches(&self.context, &cep_table, results).await?;
1914            let dataframe = self
1915                .context
1916                .sql(&format!("SELECT * FROM {cep_table}"))
1917                .await?;
1918            return Ok(self.attach_query_metadata(self.make_sql_df("cep", dataframe), query));
1919        }
1920
1921        // Rewrite TUMBLE/HOP/SESSION TVFs before other preprocessing.
1922        let query = &streaming_tvf::rewrite_window_tvfs(query);
1923
1924        let (rewritten, as_ofs) =
1925            lakehouse::preprocess_as_of_sql(query).unwrap_or_else(|_| (query.to_string(), vec![]));
1926        lakehouse::apply_as_of_refs(&self.context, &as_ofs).await?;
1927
1928        // ── Plan cache ────────────────────────────────────────────────────────
1929        // Check the cache before sending the query through DataFusion's full
1930        // parse → analyse → optimise pipeline. Only cache simple queries without
1931        // DDL or AS-OF refs; DDL side effects must not be bypassed.
1932        // Single-lock design: lookup and insert share the same Mutex<PlanCache>,
1933        // eliminating the TOCTOU race of the previous DashMap + VecDeque approach.
1934        let can_cache = as_ofs.is_empty();
1935        let shuffle_override = self
1936            .shuffle_partitions
1937            .read()
1938            .map(|g| *g)
1939            .unwrap_or_else(|e| *e.into_inner());
1940        if can_cache {
1941            // Scope the guard so it is dropped before any .await point.
1942            let cached_plan: Option<datafusion::logical_expr::LogicalPlan> = self
1943                .plan_cache
1944                .lock()
1945                .unwrap_or_else(|e| e.into_inner())
1946                .get(&rewritten)
1947                .cloned();
1948            if let Some(plan) = cached_plan {
1949                let dataframe = self.context.execute_logical_plan(plan).await?;
1950                return Ok(self.attach_query_metadata(
1951                    self.make_sql_df("sql-query", dataframe)
1952                        .with_shuffle_partitions(shuffle_override),
1953                    &rewritten,
1954                ));
1955            }
1956        }
1957
1958        let dataframe = self.context.sql(&rewritten).await?;
1959
1960        // After CREATE EXTERNAL TABLE DDL, try to extract row-count statistics
1961        // from the newly registered table provider so `BroadcastAutoRule` can
1962        // fire for small connector-backed tables (e.g. Parquet/S3 via DDL).
1963        if let Some(table_name) = extract_create_external_table_name(&rewritten)
1964            && !table_name.is_empty()
1965            && let Ok(provider) = self.context.table_provider(&table_name).await
1966        {
1967            let maybe_rows = provider
1968                .statistics()
1969                .and_then(|s| s.num_rows.get_value().copied());
1970            if let Some(n) = maybe_rows
1971                && let Ok(mut counts) = self.table_row_counts.write()
1972            {
1973                counts.entry(table_name).or_insert(n as u64);
1974            }
1975        }
1976
1977        // Cache the logical plan for future repeated calls.
1978        if can_cache {
1979            let plan = dataframe.logical_plan().clone();
1980            match self.plan_cache.lock() {
1981                Ok(mut cache) => cache.insert(rewritten.clone(), plan),
1982                Err(poisoned) => poisoned.into_inner().insert(rewritten.clone(), plan),
1983            }
1984        }
1985
1986        Ok(self.attach_query_metadata(
1987            self.make_sql_df("sql-query", dataframe)
1988                .with_shuffle_partitions(shuffle_override),
1989            &rewritten,
1990        ))
1991    }
1992
1993    /// Execute a SQL query with a timeout.
1994    ///
1995    /// Returns [`SqlError::Timeout`] if `timeout_ms` elapses before the query
1996    /// produces a result.  The underlying DataFusion task is abandoned (not
1997    /// cancelled at the engine level) when the timeout fires; its resources are
1998    /// released when the spawned task eventually completes.
1999    pub async fn execute_with_timeout(
2000        &self,
2001        query: impl AsRef<str> + Send,
2002        timeout_ms: u64,
2003    ) -> SqlResult<SqlDataFrame> {
2004        let timeout = std::time::Duration::from_millis(timeout_ms);
2005        tokio::time::timeout(timeout, self.sql(query))
2006            .await
2007            .map_err(|_| SqlError::Timeout { timeout_ms })?
2008    }
2009
2010    /// Execute a SQL query tagged with a caller-supplied operation ID.
2011    ///
2012    /// The operation ID is recorded in the returned [`TaggedQueryResult`] and
2013    /// can be used to correlate logs, metrics, and cancellation requests.
2014    /// If `cancelled_ids` contains `operation_id` before execution begins the
2015    /// function returns [`SqlError::OperationCancelled`] immediately.
2016    pub async fn execute_with_operation_id(
2017        &self,
2018        operation_id: u64,
2019        query: impl AsRef<str> + Send,
2020        cancelled_ids: &OperationRegistry,
2021    ) -> SqlResult<TaggedQueryResult> {
2022        if cancelled_ids.is_cancelled(operation_id) {
2023            return Err(SqlError::OperationCancelled { operation_id });
2024        }
2025        let df = self.sql(query).await?;
2026        Ok(TaggedQueryResult {
2027            operation_id,
2028            inner: df,
2029        })
2030    }
2031
2032    /// Resolve a SQL table reference to an `(Arc<dyn Catalog>, TableIdent)` pair
2033    /// from the registered Iceberg catalogs.
2034    ///
2035    /// Accepts 2-part (`ns.tbl`) and 3-part (`cat.ns.tbl`) references.
2036    /// Returns `None` when no catalog is registered or the reference is ambiguous.
2037    #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
2038    fn resolve_iceberg_table(
2039        &self,
2040        table_ref: &str,
2041    ) -> Option<(Arc<dyn iceberg::Catalog + Send + Sync>, iceberg::TableIdent)> {
2042        let parts: Vec<&str> = table_ref.splitn(3, '.').collect();
2043        let (catalog_arc, ns_str, table_str) = {
2044            let guard = self
2045                .iceberg_catalogs
2046                .read()
2047                .unwrap_or_else(|e| e.into_inner());
2048            if guard.is_empty() {
2049                return None;
2050            }
2051            match parts.len() {
2052                2 => {
2053                    let (cat, _) = guard.first()?;
2054                    (Arc::clone(cat), *parts.first()?, *parts.get(1)?)
2055                }
2056                3 => {
2057                    let cat_name = parts.first().copied()?;
2058                    let (cat, _) = guard.iter().find(|(_, n)| n == cat_name)?;
2059                    (Arc::clone(cat), *parts.get(1)?, *parts.get(2)?)
2060                }
2061                _ => return None,
2062            }
2063        };
2064        let ns = iceberg::NamespaceIdent::from_vec(vec![ns_str.to_string()]).ok()?;
2065        let ident = iceberg::TableIdent::new(ns, table_str.to_string());
2066        Some((catalog_arc.as_iceberg(), ident))
2067    }
2068
2069    /// Dispatch a `CALL system.<proc>(...)` statement to the appropriate
2070    /// Iceberg maintenance function on the first registered KrishivCatalog.
2071    #[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
2072    async fn dispatch_call_system(&self, stmt: &str) -> SqlResult<RecordBatch> {
2073        use arrow::array::{ArrayRef, Int64Array};
2074        use arrow::datatypes::{DataType, Field, Schema};
2075
2076        let upper = stmt.to_ascii_uppercase();
2077        const PREFIX: &str = "CALL SYSTEM.";
2078        let upper_after = &upper[PREFIX.len()..];
2079        let orig_after = &stmt[PREFIX.len()..];
2080
2081        let paren = upper_after.find('(').ok_or_else(|| SqlError::DataFusion {
2082            message: format!("CALL: missing '(' in: {stmt}"),
2083        })?;
2084        let proc_name = upper_after[..paren].trim();
2085
2086        let args_raw = orig_after[paren + 1..]
2087            .trim_end_matches(';')
2088            .trim()
2089            .trim_end_matches(')')
2090            .trim();
2091        let args = call_args_from_str(args_raw);
2092
2093        let iceberg_catalog = {
2094            let guard = self
2095                .iceberg_catalogs
2096                .read()
2097                .unwrap_or_else(|e| e.into_inner());
2098            guard
2099                .first()
2100                .ok_or_else(|| SqlError::DataFusion {
2101                    message: "CALL system: no Iceberg catalog registered".to_string(),
2102                })?
2103                .0
2104                .as_iceberg()
2105        };
2106
2107        let table_ref = args.first().ok_or_else(|| SqlError::DataFusion {
2108            message: format!("CALL {proc_name}: table reference argument is required"),
2109        })?;
2110        let table_ident = iceberg_table_ident(table_ref)?;
2111
2112        let count: i64 = match proc_name {
2113            "EXPIRE_SNAPSHOTS" => {
2114                let dur_s = args.get(1).ok_or_else(|| SqlError::DataFusion {
2115                    message: "CALL expire_snapshots: duration argument is required".to_string(),
2116                })?;
2117                let older_than = parse_call_duration(dur_s)?;
2118                let retain_last = args
2119                    .get(2)
2120                    .and_then(|s| s.parse::<usize>().ok())
2121                    .unwrap_or(1);
2122                krishiv_connectors::lakehouse::maintenance::expire_snapshots(
2123                    iceberg_catalog,
2124                    &table_ident,
2125                    older_than,
2126                    retain_last,
2127                )
2128                .await
2129                .map_err(|e| SqlError::DataFusion {
2130                    message: e.to_string(),
2131                })? as i64
2132            }
2133            "REMOVE_ORPHAN_FILES" => {
2134                let dur_s = args.get(1).ok_or_else(|| SqlError::DataFusion {
2135                    message: "CALL remove_orphan_files: duration argument is required".to_string(),
2136                })?;
2137                let older_than = parse_call_duration(dur_s)?;
2138                krishiv_connectors::lakehouse::maintenance::remove_orphan_files(
2139                    iceberg_catalog,
2140                    &table_ident,
2141                    older_than,
2142                )
2143                .await
2144                .map_err(|e| SqlError::DataFusion {
2145                    message: e.to_string(),
2146                })? as i64
2147            }
2148            "COMPACT_DATA_FILES" => {
2149                let target_bytes = args
2150                    .get(1)
2151                    .and_then(|s| s.parse::<u64>().ok())
2152                    .unwrap_or(128 * 1024 * 1024);
2153                krishiv_connectors::lakehouse::maintenance::compact_data_files(
2154                    iceberg_catalog,
2155                    &table_ident,
2156                    target_bytes,
2157                )
2158                .await
2159                .map_err(|e| SqlError::DataFusion {
2160                    message: e.to_string(),
2161                })? as i64
2162            }
2163            other => {
2164                return Err(SqlError::Unsupported {
2165                    feature: format!("CALL system.{other}: unknown procedure"),
2166                });
2167            }
2168        };
2169
2170        let col = match proc_name {
2171            "EXPIRE_SNAPSHOTS" => "expired_snapshots",
2172            "REMOVE_ORPHAN_FILES" => "removed_files",
2173            "COMPACT_DATA_FILES" => "rewritten_files",
2174            _ => "result",
2175        };
2176        let schema = Arc::new(Schema::new(vec![Field::new(col, DataType::Int64, false)]));
2177        let array: ArrayRef = Arc::new(Int64Array::from(vec![count]));
2178        RecordBatch::try_new(schema, vec![array]).map_err(|e| SqlError::DataFusion {
2179            message: e.to_string(),
2180        })
2181    }
2182}
2183
2184/// A query result annotated with the operation ID that produced it.
2185pub struct TaggedQueryResult {
2186    /// The caller-supplied operation ID.
2187    pub operation_id: u64,
2188    /// The underlying SQL DataFrame.
2189    pub inner: SqlDataFrame,
2190}
2191
2192/// Registry of cancelled operation IDs and optional progress snapshots.
2193///
2194/// Callers can cancel an in-flight operation by registering its ID here before
2195/// or during execution.  [`SqlEngine::execute_with_operation_id`] checks this
2196/// registry at the start of execution.
2197#[derive(Clone, Default)]
2198pub struct OperationRegistry {
2199    cancelled: Arc<std::sync::RwLock<std::collections::HashSet<u64>>>,
2200    progress: Arc<std::sync::RwLock<std::collections::HashMap<u64, (u64, u64)>>>,
2201}
2202
2203impl OperationRegistry {
2204    /// Create a new, empty operation registry.
2205    pub fn new() -> Self {
2206        Self::default()
2207    }
2208
2209    /// Cancel an operation by ID.  Subsequent
2210    /// [`execute_with_operation_id`][SqlEngine::execute_with_operation_id] calls
2211    /// with this ID will return [`SqlError::OperationCancelled`].
2212    pub fn cancel(&self, operation_id: u64) {
2213        if let Ok(mut ids) = self.cancelled.write() {
2214            ids.insert(operation_id);
2215        }
2216    }
2217
2218    /// Return `true` if `operation_id` has been cancelled.
2219    pub fn is_cancelled(&self, operation_id: u64) -> bool {
2220        self.cancelled
2221            .read()
2222            .map(|ids| ids.contains(&operation_id))
2223            .unwrap_or(false)
2224    }
2225
2226    /// Remove a cancelled ID (e.g. once the operation has been cleaned up).
2227    pub fn remove(&self, operation_id: u64) {
2228        if let Ok(mut ids) = self.cancelled.write() {
2229            ids.remove(&operation_id);
2230        }
2231        if let Ok(mut progress) = self.progress.write() {
2232            progress.remove(&operation_id);
2233        }
2234    }
2235
2236    /// Record row-level progress for an operation.
2237    pub fn update_progress(&self, operation_id: u64, rows_scanned: u64, rows_emitted: u64) {
2238        if let Ok(mut progress) = self.progress.write() {
2239            progress.insert(operation_id, (rows_scanned, rows_emitted));
2240        }
2241    }
2242
2243    /// Return the latest `(rows_scanned, rows_emitted)` snapshot, if any.
2244    pub fn progress(&self, operation_id: u64) -> Option<(u64, u64)> {
2245        self.progress
2246            .read()
2247            .ok()
2248            .and_then(|progress| progress.get(&operation_id).copied())
2249    }
2250
2251    /// Return all currently cancelled operation IDs.
2252    pub fn cancelled_ids(&self) -> Vec<u64> {
2253        self.cancelled
2254            .read()
2255            .map(|ids| ids.iter().copied().collect())
2256            .unwrap_or_default()
2257    }
2258}
2259
2260/// Extract the table name from a `CREATE EXTERNAL TABLE <name> ...` DDL statement.
2261///
2262/// Returns `None` for any other SQL statement. Used to populate `table_row_counts`
2263/// after DDL so that `BroadcastAutoRule` can fire for connector-backed tables.
2264pub(crate) fn extract_create_external_table_name(query: &str) -> Option<String> {
2265    use datafusion::sql::parser::{DFParser, Statement as DFStatement};
2266    let mut stmts = DFParser::parse_sql(query).ok()?;
2267    match stmts.pop_front()? {
2268        DFStatement::CreateExternalTable(create) => Some(create.name.to_string()),
2269        _ => None,
2270    }
2271}
2272
2273/// Engine-agnostic interface over a prepared query result.
2274///
2275/// Hides the concrete [`SqlDataFrame`] (which holds a DataFusion `DataFrame`)
2276/// behind a stable trait so that `krishiv-api` and other callers are not
2277/// forced to depend on DataFusion types.  `datafusion` stays an implementation
2278/// detail inside `krishiv-sql`; a future engine swap only requires a new impl.
2279/// Engine-neutral grouping-set mode for canonical DataFrame aggregation.
2280pub enum GroupingMode<'a> {
2281    Sets(Vec<Vec<&'a krishiv_plan::expression::Expr>>),
2282    Cube(Vec<&'a krishiv_plan::expression::Expr>),
2283    Rollup(Vec<&'a krishiv_plan::expression::Expr>),
2284}
2285
2286#[async_trait::async_trait]
2287pub trait KrishivDataFrameOps: Send + Sync {
2288    /// Execute and collect all result batches.
2289    async fn collect(&self) -> SqlResult<Vec<RecordBatch>>;
2290    /// Execute, collect results, and return lightweight runtime statistics.
2291    async fn collect_with_stats(&self) -> SqlResult<(Vec<RecordBatch>, SqlExecutionStats)>;
2292    /// Explain the physical and logical plan text (does not execute).
2293    async fn explain(&self) -> SqlResult<String>;
2294    /// Explain the logical plan text without executing.
2295    fn explain_logical(&self) -> String;
2296    /// Build a Krishiv [`LogicalPlan`] wrapper for this DataFrame.
2297    fn krishiv_logical_plan(&self) -> LogicalPlan;
2298    /// The original SQL query string, if any.
2299    fn query(&self) -> Option<&str>;
2300    /// Execute and return a record batch stream.
2301    async fn execute_stream(&self) -> SqlResult<SqlStream>;
2302
2303    // ── DataFrame transforms (lazy) ─────────────────────────────────────────
2304
2305    /// Return the Arrow schema of this DataFrame.
2306    fn schema(&self) -> SchemaRef;
2307
2308    /// Select columns by name.
2309    async fn select(&self, columns: &[&str]) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2310
2311    /// Select arbitrary SQL expressions.
2312    async fn select_exprs(
2313        &self,
2314        expressions: &[&krishiv_plan::expression::Expr],
2315    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2316
2317    /// Group by expressions and compute aggregate expressions.
2318    async fn aggregate(
2319        &self,
2320        group_exprs: &[&krishiv_plan::expression::Expr],
2321        aggregate_exprs: &[&krishiv_plan::expression::Expr],
2322    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2323
2324    /// Aggregate using GROUPING SETS, CUBE, or ROLLUP.
2325    async fn aggregate_grouping(
2326        &self,
2327        grouping: GroupingMode<'_>,
2328        aggregate_exprs: &[&krishiv_plan::expression::Expr],
2329    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2330
2331    /// Pivot known values into aggregate columns.
2332    async fn pivot(
2333        &self,
2334        group_exprs: &[&krishiv_plan::expression::Expr],
2335        pivot_column: &krishiv_plan::expression::Expr,
2336        aggregate_expr: &krishiv_plan::expression::Expr,
2337        values: &[(krishiv_plan::expression::ScalarValue, String)],
2338    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2339
2340    /// Unpivot columns into name/value rows while preserving other columns.
2341    async fn unpivot(
2342        &self,
2343        columns: &[&str],
2344        name_column: &str,
2345        value_column: &str,
2346    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2347
2348    /// Filter rows by a SQL predicate expression.
2349    async fn filter(&self, predicate: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2350
2351    /// Filter rows using the engine-owned typed expression AST.
2352    async fn filter_expr(
2353        &self,
2354        predicate: &krishiv_plan::expression::Expr,
2355    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2356
2357    /// Limit the number of rows.
2358    async fn limit(&self, n: usize) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2359
2360    /// Remove duplicate rows.
2361    async fn distinct(&self) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2362
2363    /// Drop rows with nulls in selected columns; an empty list checks all columns.
2364    async fn drop_nulls(&self, columns: &[&str]) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2365
2366    /// Bernoulli-sample rows.
2367    async fn sample(&self, fraction: f64) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2368
2369    /// Sort by columns with optional descending flags.
2370    async fn sort(
2371        &self,
2372        columns: &[&str],
2373        descending: &[bool],
2374    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2375
2376    /// Assign an alias (table name) to this DataFrame.
2377    async fn alias(&self, alias: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2378
2379    /// Drop columns by name.
2380    async fn drop_columns(&self, columns: &[&str]) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2381
2382    /// Rename a column from `old` to `new`.
2383    async fn rename_column(&self, old: &str, new: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2384
2385    /// Add or replace a column with a computed expression.
2386    async fn with_column(&self, name: &str, expr: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2387
2388    /// Return the underlying concrete type for downcasting.
2389    fn as_any(&self) -> &dyn std::any::Any;
2390
2391    /// Compute summary statistics (delegates to DataFusion's `describe`).
2392    async fn describe(&self) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2393
2394    /// Fill null values in `column` with the literal SQL `value`.
2395    async fn fill_null(&self, column: &str, value: &str)
2396    -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2397
2398    /// Join with another DataFrame using a join type and equi-join keys.
2399    async fn join(
2400        &self,
2401        right: &dyn KrishivDataFrameOps,
2402        how: &str,
2403        left_on: &[&str],
2404        right_on: &[&str],
2405    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2406
2407    /// Union this DataFrame with another (UNION ALL semantics).
2408    async fn union(
2409        &self,
2410        right: &dyn KrishivDataFrameOps,
2411    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2412
2413    async fn union_distinct(
2414        &self,
2415        right: &dyn KrishivDataFrameOps,
2416    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2417
2418    async fn intersect(
2419        &self,
2420        right: &dyn KrishivDataFrameOps,
2421        distinct: bool,
2422    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2423
2424    async fn except(
2425        &self,
2426        right: &dyn KrishivDataFrameOps,
2427        distinct: bool,
2428    ) -> SqlResult<Box<dyn KrishivDataFrameOps>>;
2429
2430    /// Register a list of record batches as a named in-memory table in the
2431    /// same session context that backs this DataFrame.  Used by `cache()`.
2432    async fn register_batches(&self, name: &str, batches: Vec<RecordBatch>) -> SqlResult<()>;
2433
2434    /// Deregister a named table from the session context.  Used by `unpersist()`.
2435    async fn deregister_table(&self, name: &str) -> SqlResult<()>;
2436
2437    /// Create (or replace) a SQL view named `name` backed by this DataFrame's
2438    /// query.  Used by `create_or_replace_temp_view()`.
2439    async fn create_view(&self, name: &str, replace: bool) -> SqlResult<()>;
2440}
2441
2442/// Recursively walk a DataFusion `LogicalPlan` and produce Krishiv `PlanNode`
2443/// entries.  Returns `(nodes, root_id)` where `root_id` is the ID of the
2444/// top-level Krishiv node representing `plan`.
2445///
2446/// Table-scan nodes carry `estimated_rows` when the table name is found in
2447/// `table_row_counts`.  Unhandled node types fall back to a single opaque
2448/// `NodeOp::Other` node.
2449fn df_plan_to_krishiv_nodes(
2450    plan: &datafusion::logical_expr::LogicalPlan,
2451    table_row_counts: &std::collections::HashMap<String, u64>,
2452    counter: &mut usize,
2453) -> (Vec<krishiv_plan::PlanNode>, String) {
2454    use datafusion::logical_expr::LogicalPlan as DfPlan;
2455    use krishiv_plan::{ExecutionKind, NodeOp, PlanNode};
2456
2457    *counter += 1;
2458    let idx = *counter;
2459
2460    match plan {
2461        DfPlan::TableScan(ts) => {
2462            let table_name = ts.table_name.table().to_string();
2463            let row_count = table_row_counts.get(&table_name).copied();
2464            let filters: Vec<String> = ts.filters.iter().map(|e| e.to_string()).collect();
2465            let id = format!("scan-{idx}");
2466            let node = PlanNode::new(&id, format!("Scan {table_name}"), ExecutionKind::Batch)
2467                .with_op(NodeOp::Scan {
2468                    table: table_name,
2469                    filters,
2470                })
2471                .with_estimated_rows(row_count);
2472            (vec![node], id)
2473        }
2474
2475        DfPlan::Projection(proj) => {
2476            let (mut nodes, input_id) =
2477                df_plan_to_krishiv_nodes(&proj.input, table_row_counts, counter);
2478            let id = format!("proj-{idx}");
2479            let columns: Vec<String> = proj.expr.iter().map(|e| e.to_string()).collect();
2480            nodes.push(
2481                PlanNode::new(&id, "Projection", ExecutionKind::Batch)
2482                    .with_op(NodeOp::Project { columns })
2483                    .with_inputs([input_id]),
2484            );
2485            (nodes, id)
2486        }
2487
2488        DfPlan::Filter(filter) => {
2489            let (mut nodes, input_id) =
2490                df_plan_to_krishiv_nodes(&filter.input, table_row_counts, counter);
2491            let id = format!("filter-{idx}");
2492            let predicate = filter.predicate.to_string();
2493            nodes.push(
2494                PlanNode::new(&id, "Filter", ExecutionKind::Batch)
2495                    .with_op(NodeOp::Filter { predicate })
2496                    .with_inputs([input_id]),
2497            );
2498            (nodes, id)
2499        }
2500
2501        DfPlan::Aggregate(agg) => {
2502            let (mut nodes, input_id) =
2503                df_plan_to_krishiv_nodes(&agg.input, table_row_counts, counter);
2504            let id = format!("agg-{idx}");
2505            let group_keys: Vec<String> = agg.group_expr.iter().map(|e| e.to_string()).collect();
2506            nodes.push(
2507                PlanNode::new(&id, "Aggregate", ExecutionKind::Batch)
2508                    .with_op(NodeOp::Aggregate { group_keys })
2509                    .with_inputs([input_id]),
2510            );
2511            (nodes, id)
2512        }
2513
2514        DfPlan::Join(join) => {
2515            let (mut nodes, left_id) =
2516                df_plan_to_krishiv_nodes(&join.left, table_row_counts, counter);
2517            let (right_nodes, right_id) =
2518                df_plan_to_krishiv_nodes(&join.right, table_row_counts, counter);
2519            nodes.extend(right_nodes);
2520            let id = format!("join-{idx}");
2521            // T2: map every DataFusion join variant to its first-class plan
2522            // counterpart instead of silently downgrading unknowns to `Inner`.
2523            // `LeftSemi`/`RightSemi`/`LeftAnti`/`RightAnti` are the variants
2524            // that were previously collapsed.
2525            let krishiv_join_type = match join.join_type {
2526                datafusion::common::JoinType::Inner => krishiv_plan::JoinType::Inner,
2527                datafusion::common::JoinType::Left => krishiv_plan::JoinType::Left,
2528                datafusion::common::JoinType::Right => krishiv_plan::JoinType::Right,
2529                datafusion::common::JoinType::Full => krishiv_plan::JoinType::Full,
2530                datafusion::common::JoinType::LeftSemi => krishiv_plan::JoinType::LeftSemi,
2531                datafusion::common::JoinType::RightSemi => krishiv_plan::JoinType::RightSemi,
2532                datafusion::common::JoinType::LeftAnti => krishiv_plan::JoinType::LeftAnti,
2533                datafusion::common::JoinType::RightAnti => krishiv_plan::JoinType::RightAnti,
2534                // DataFusion also exposes `LeftMark`/`RightMark` for some
2535                // subquery-rewritten plans; treat them as Semi for now to
2536                // preserve the prior behaviour. Future work can split them.
2537                datafusion::common::JoinType::LeftMark => krishiv_plan::JoinType::LeftSemi,
2538                datafusion::common::JoinType::RightMark => krishiv_plan::JoinType::RightSemi,
2539            };
2540            nodes.push(
2541                PlanNode::new(&id, "Join", ExecutionKind::Batch)
2542                    .with_op(NodeOp::Join {
2543                        join_type: krishiv_join_type,
2544                    })
2545                    .with_inputs([left_id, right_id]),
2546            );
2547            (nodes, id)
2548        }
2549
2550        DfPlan::Sort(sort) => {
2551            let (mut nodes, input_id) =
2552                df_plan_to_krishiv_nodes(&sort.input, table_row_counts, counter);
2553            let id = format!("sort-{idx}");
2554            nodes.push(
2555                PlanNode::new(&id, "Sort", ExecutionKind::Batch)
2556                    .with_op(NodeOp::Other {
2557                        description: format!(
2558                            "Sort({})",
2559                            sort.expr
2560                                .iter()
2561                                .map(|e| e.to_string())
2562                                .collect::<Vec<_>>()
2563                                .join(", ")
2564                        ),
2565                    })
2566                    .with_inputs([input_id]),
2567            );
2568            (nodes, id)
2569        }
2570
2571        DfPlan::Repartition(repart) => {
2572            let (mut nodes, input_id) =
2573                df_plan_to_krishiv_nodes(&repart.input, table_row_counts, counter);
2574            let id = format!("exchange-{idx}");
2575            let partitioning = krishiv_plan::Partitioning::Unpartitioned;
2576            nodes.push(
2577                PlanNode::new(&id, "Exchange", ExecutionKind::Batch)
2578                    .with_op(NodeOp::Exchange { partitioning })
2579                    .with_inputs([input_id]),
2580            );
2581            (nodes, id)
2582        }
2583
2584        DfPlan::Limit(limit) => {
2585            let (mut nodes, input_id) =
2586                df_plan_to_krishiv_nodes(&limit.input, table_row_counts, counter);
2587            let id = format!("limit-{idx}");
2588            nodes.push(
2589                PlanNode::new(&id, "Limit", ExecutionKind::Batch)
2590                    .with_op(NodeOp::Other {
2591                        description: format!(
2592                            "Limit(skip={:?}, fetch={:?})",
2593                            limit.skip.as_ref().map(|e| e.to_string()),
2594                            limit.fetch.as_ref().map(|e| e.to_string()),
2595                        ),
2596                    })
2597                    .with_inputs([input_id]),
2598            );
2599            (nodes, id)
2600        }
2601
2602        DfPlan::Union(union) if union.inputs.len() == 1 => {
2603            if let Some(input) = union.inputs.first() {
2604                df_plan_to_krishiv_nodes(input, table_row_counts, counter)
2605            } else {
2606                (Vec::new(), String::new())
2607            }
2608        }
2609        DfPlan::Union(union) => {
2610            let mut all_nodes = Vec::new();
2611            let mut input_ids = Vec::new();
2612            for input in &union.inputs {
2613                let (sub_nodes, sub_id) =
2614                    df_plan_to_krishiv_nodes(input, table_row_counts, counter);
2615                all_nodes.extend(sub_nodes);
2616                input_ids.push(sub_id);
2617            }
2618            let id = format!("union-{idx}");
2619            all_nodes.push(
2620                PlanNode::new(&id, "Union", ExecutionKind::Batch)
2621                    .with_op(NodeOp::Other {
2622                        description: "Union".to_string(),
2623                    })
2624                    .with_inputs(input_ids),
2625            );
2626            (all_nodes, id)
2627        }
2628
2629        DfPlan::SubqueryAlias(alias) => {
2630            // SubqueryAlias is transparent; peel it and continue.
2631            df_plan_to_krishiv_nodes(&alias.input, table_row_counts, counter)
2632        }
2633
2634        DfPlan::Values(_) => {
2635            let id = format!("values-{idx}");
2636            let node = PlanNode::new(&id, "Values", ExecutionKind::Batch).with_op(NodeOp::Other {
2637                description: "Values".to_string(),
2638            });
2639            (vec![node], id)
2640        }
2641
2642        DfPlan::Extension(_) => {
2643            let id = format!("ext-{idx}");
2644            let label = plan.to_string();
2645            let node = PlanNode::new(&id, label.clone(), ExecutionKind::Batch)
2646                .with_op(NodeOp::Other { description: label });
2647            (vec![node], id)
2648        }
2649
2650        DfPlan::EmptyRelation(_) => {
2651            let id = format!("empty-{idx}");
2652            let node =
2653                PlanNode::new(&id, "EmptyRelation", ExecutionKind::Batch).with_op(NodeOp::Other {
2654                    description: "EmptyRelation".to_string(),
2655                });
2656            (vec![node], id)
2657        }
2658
2659        // Fallback: wrap the entire subplan as an opaque node.
2660        _ => {
2661            let id = format!("df-{idx}");
2662            let label = plan.to_string();
2663            let node = PlanNode::new(&id, label.clone(), ExecutionKind::Batch)
2664                .with_op(NodeOp::Other { description: label });
2665            (vec![node], id)
2666        }
2667    }
2668}
2669
2670/// Krishiv-owned wrapper around a DataFusion DataFrame.
2671#[derive(Clone)]
2672pub struct SqlDataFrame {
2673    name: String,
2674    query: Option<String>,
2675    /// Alias for `query` used by `create_view` — same value.
2676    query_text: Option<String>,
2677    execution_kind: ExecutionKind,
2678    dataframe: DataFusionDataFrame,
2679    shuffle_partitions: Option<u32>,
2680    /// Shared session context for table registration (cache/view operations).
2681    context: SessionContext,
2682    /// Estimated row counts for registered tables, keyed by table name.
2683    /// Used by `krishiv_logical_plan` to annotate scan nodes with
2684    /// `estimated_rows` so `BroadcastAutoRule` can fire.
2685    table_row_counts: Arc<std::sync::RwLock<HashMap<String, u64>>>,
2686}
2687
2688impl fmt::Debug for SqlDataFrame {
2689    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2690        f.debug_struct("SqlDataFrame")
2691            .field("name", &self.name)
2692            .field("query", &self.query)
2693            .field("shuffle_partitions", &self.shuffle_partitions)
2694            .finish_non_exhaustive()
2695    }
2696}
2697
2698impl SqlDataFrame {
2699    fn new(
2700        name: impl Into<String>,
2701        dataframe: DataFusionDataFrame,
2702        table_row_counts: Arc<std::sync::RwLock<HashMap<String, u64>>>,
2703    ) -> Self {
2704        Self {
2705            name: name.into(),
2706            query: None,
2707            query_text: None,
2708            execution_kind: ExecutionKind::Batch,
2709            dataframe,
2710            shuffle_partitions: None,
2711            context: SessionContext::default(),
2712            table_row_counts,
2713        }
2714    }
2715
2716    /// Attach the session context so cache/view operations share the live session.
2717    pub(crate) fn with_context(mut self, context: SessionContext) -> Self {
2718        self.context = context;
2719        self
2720    }
2721
2722    fn with_query(mut self, query: impl Into<String>) -> Self {
2723        let q = query.into();
2724        self.query_text = Some(q.clone());
2725        self.query = Some(q);
2726        self
2727    }
2728
2729    fn with_execution_kind(mut self, kind: ExecutionKind) -> Self {
2730        self.execution_kind = kind;
2731        self
2732    }
2733
2734    fn with_shuffle_partitions(mut self, n: Option<u32>) -> Self {
2735        self.shuffle_partitions = n;
2736        self
2737    }
2738
2739    /// Original SQL query when created from [`SqlEngine::sql`].
2740    pub fn query(&self) -> Option<&str> {
2741        self.query.as_deref()
2742    }
2743
2744    /// Return a new `SqlDataFrame` with the given DataFusion DataFrame,
2745    /// preserving the rest of this instance's state.  The new name suffix
2746    /// helps distinguish transform steps in logical-plan descriptions.
2747    fn with_new_dataframe(&self, df: DataFusionDataFrame, tag: &str) -> Self {
2748        Self {
2749            name: format!("{}-{}", self.name, tag),
2750            query: None,
2751            query_text: None,
2752            execution_kind: self.execution_kind,
2753            dataframe: df,
2754            shuffle_partitions: self.shuffle_partitions,
2755            context: self.context.clone(),
2756            table_row_counts: self.table_row_counts.clone(),
2757        }
2758    }
2759
2760    /// Create a Krishiv logical plan wrapper for this DataFrame.
2761    ///
2762    /// Walks the DataFusion logical plan tree, creating Krishiv `PlanNode`
2763    /// entries for each operator. Table-scan nodes are annotated with
2764    /// `estimated_rows` from the engine's table-row-count registry, allowing
2765    /// `BroadcastAutoRule` to identify small tables for broadcast join
2766    /// promotion. The plan is then run through the logical optimizer before
2767    /// being returned.
2768    pub fn krishiv_logical_plan(&self) -> LogicalPlan {
2769        let df_plan = self.dataframe.logical_plan();
2770        let counts = self
2771            .table_row_counts
2772            .read()
2773            .unwrap_or_else(|e| e.into_inner());
2774        let mut counter = 0usize;
2775        let (nodes, _root_id) = df_plan_to_krishiv_nodes(df_plan, &counts, &mut counter);
2776
2777        let mut plan = LogicalPlan::new(self.name.clone(), self.execution_kind);
2778        for node in nodes {
2779            plan = plan.with_node(node);
2780        }
2781
2782        // Run the logical optimizer so BroadcastAutoRule fires on eligible scans.
2783        // An optimizer failure falls back to the unoptimized (still valid) plan;
2784        // execution correctness does not depend on optimization, but the failure
2785        // must be observable rather than silent.
2786        let optimizer = krishiv_plan::optimizer::default_logical_optimizer();
2787        let fallback = plan.clone();
2788        match optimizer.optimize(plan) {
2789            Ok(result) => result.plan,
2790            Err(error) => {
2791                tracing::warn!(
2792                    plan = %self.name,
2793                    %error,
2794                    "logical optimizer failed; using unoptimized plan"
2795                );
2796                fallback
2797            }
2798        }
2799    }
2800
2801    /// Explain the logical plan without executing it.
2802    pub fn explain_logical(&self) -> String {
2803        self.dataframe.logical_plan().to_string()
2804    }
2805
2806    /// Explain logical and physical plan details through DataFusion.
2807    pub async fn explain(&self) -> SqlResult<String> {
2808        let batches = self
2809            .dataframe
2810            .clone()
2811            .explain(false, false)?
2812            .collect()
2813            .await?;
2814        pretty_batches(&batches)
2815    }
2816
2817    /// Execute and collect this DataFrame.
2818    pub async fn collect(&self) -> SqlResult<Vec<RecordBatch>> {
2819        Ok(self.dataframe.clone().collect().await?)
2820    }
2821
2822    /// Execute and return a record batch stream.
2823    pub async fn execute_stream(&self) -> SqlResult<SqlStream> {
2824        let df_stream = self.dataframe.clone().execute_stream().await?;
2825        use futures::StreamExt;
2826        let mapped = df_stream.map(|res| {
2827            res.map_err(|e| SqlError::DataFusion {
2828                message: e.to_string(),
2829            })
2830        });
2831        Ok(Box::pin(mapped))
2832    }
2833
2834    /// Execute and collect this DataFrame, also returning lightweight runtime statistics.
2835    ///
2836    /// Collects `output_rows` from DataFusion's execution metrics. `cpu_nanos`
2837    /// is approximated from `elapsed_compute` when available. `spill_bytes`
2838    /// and `spill_count` are aggregated across every operator in the physical
2839    /// plan tree (sorts, hash joins, and aggregations report spills when the
2840    /// memory pool forces them to disk); other fields default to 0.
2841    pub async fn collect_with_stats(&self) -> SqlResult<(Vec<RecordBatch>, SqlExecutionStats)> {
2842        use datafusion::physical_plan::collect as df_collect;
2843
2844        let df = self.dataframe.clone();
2845        let task_ctx = df.task_ctx();
2846        let physical_plan = df.create_physical_plan().await?;
2847
2848        let batches = df_collect(physical_plan.clone(), task_ctx.into()).await?;
2849
2850        let mut output_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
2851        let mut cpu_nanos: u64 = 0;
2852
2853        if let Some(metrics) = physical_plan.metrics() {
2854            if let Some(v) = metrics.output_rows() {
2855                output_rows = v as u64;
2856            }
2857            if let Some(t) = metrics.elapsed_compute() {
2858                cpu_nanos = t as u64;
2859            }
2860        }
2861
2862        let (spill_bytes, spill_count) = aggregate_spill_metrics(physical_plan.as_ref());
2863
2864        Ok((
2865            batches,
2866            SqlExecutionStats {
2867                output_rows,
2868                cpu_nanos,
2869                spill_bytes,
2870                spill_count,
2871            },
2872        ))
2873    }
2874}
2875
2876/// Recursively sum `spilled_bytes` and `spill_count` metrics across every
2877/// operator in a physical plan tree.
2878///
2879/// The root node's `metrics()` only reflects the root operator; spilling
2880/// happens in interior sort/join/aggregate nodes, so the whole tree must be
2881/// walked to account for all disk spill activity.
2882fn aggregate_spill_metrics(plan: &dyn datafusion::physical_plan::ExecutionPlan) -> (u64, u64) {
2883    let mut spill_bytes: u64 = 0;
2884    let mut spill_count: u64 = 0;
2885    if let Some(metrics) = plan.metrics() {
2886        if let Some(bytes) = metrics.spilled_bytes() {
2887            spill_bytes = spill_bytes.saturating_add(bytes as u64);
2888        }
2889        if let Some(count) = metrics.spill_count() {
2890            spill_count = spill_count.saturating_add(count as u64);
2891        }
2892    }
2893    for child in plan.children() {
2894        let (child_bytes, child_count) = aggregate_spill_metrics(child.as_ref());
2895        spill_bytes = spill_bytes.saturating_add(child_bytes);
2896        spill_count = spill_count.saturating_add(child_count);
2897    }
2898    (spill_bytes, spill_count)
2899}
2900
2901/// Lightweight execution statistics collected from a DataFusion physical plan.
2902#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2903pub struct SqlExecutionStats {
2904    pub output_rows: u64,
2905    pub cpu_nanos: u64,
2906    /// Total bytes spilled to disk across all operators in the plan.
2907    pub spill_bytes: u64,
2908    /// Number of spill events (roughly: spill files written) across all operators.
2909    pub spill_count: u64,
2910}
2911
2912fn top_level_alias_index(expression: &str) -> Option<usize> {
2913    let bytes = expression.as_bytes();
2914    let mut depth = 0usize;
2915    let mut single_quoted = false;
2916    let mut double_quoted = false;
2917    let mut candidate = None;
2918    let mut index = 0usize;
2919    while index < bytes.len() {
2920        let Some(&byte) = bytes.get(index) else {
2921            break;
2922        };
2923        match byte {
2924            b'\'' if !double_quoted => {
2925                if single_quoted && bytes.get(index + 1) == Some(&b'\'') {
2926                    index += 2;
2927                    continue;
2928                }
2929                single_quoted = !single_quoted;
2930            }
2931            b'"' if !single_quoted => {
2932                if double_quoted && bytes.get(index + 1) == Some(&b'"') {
2933                    index += 2;
2934                    continue;
2935                }
2936                double_quoted = !double_quoted;
2937            }
2938            b'(' if !single_quoted && !double_quoted => depth += 1,
2939            b')' if !single_quoted && !double_quoted => depth = depth.saturating_sub(1),
2940            b' ' if depth == 0
2941                && !single_quoted
2942                && !double_quoted
2943                && bytes
2944                    .get(index..index + 4)
2945                    .is_some_and(|slice| slice.eq_ignore_ascii_case(b" AS ")) =>
2946            {
2947                candidate = Some(index);
2948                index += 3;
2949            }
2950            _ => {}
2951        }
2952        index += 1;
2953    }
2954    candidate
2955}
2956
2957fn parse_dataframe_expression(
2958    dataframe: &datafusion::dataframe::DataFrame,
2959    expression: &str,
2960) -> SqlResult<datafusion::logical_expr::Expr> {
2961    if let Some(index) = top_level_alias_index(expression) {
2962        let (body, alias) = expression.split_at(index);
2963        let alias = alias[4..].trim();
2964        if !alias.is_empty() {
2965            let alias = alias
2966                .strip_prefix('"')
2967                .and_then(|value| value.strip_suffix('"'))
2968                .unwrap_or(alias)
2969                .replace("\"\"", "\"");
2970            return Ok(dataframe.parse_sql_expr(body.trim())?.alias(alias));
2971        }
2972    }
2973    dataframe.parse_sql_expr(expression).map_err(Into::into)
2974}
2975
2976/// Parse the stable SQL-expression subset into the same engine-owned AST used by Rust and Python.
2977pub fn parse_public_expression(sql: &str) -> SqlResult<krishiv_plan::expression::Expr> {
2978    let dialect = GenericDialect {};
2979    let mut parser =
2980        Parser::new(&dialect)
2981            .try_with_sql(sql)
2982            .map_err(|error| SqlError::Unsupported {
2983                feature: format!("public expression parse: {error}"),
2984            })?;
2985    let expression = parser.parse_expr().map_err(|error| SqlError::Unsupported {
2986        feature: format!("public expression parse: {error}"),
2987    })?;
2988    sqlparser_expression_to_public(&expression)
2989}
2990
2991fn sqlparser_expression_to_public(
2992    expression: &datafusion::sql::sqlparser::ast::Expr,
2993) -> SqlResult<krishiv_plan::expression::Expr> {
2994    use datafusion::sql::sqlparser::ast::{BinaryOperator as SqlOperator, Expr as SqlExpr, Value};
2995    use krishiv_plan::expression::{BinaryOperator, Expr, ScalarValue};
2996
2997    Ok(match expression {
2998        SqlExpr::Identifier(identifier) => Expr::Column {
2999            path: vec![identifier.value.clone()],
3000        },
3001        SqlExpr::CompoundIdentifier(identifiers) => Expr::Column {
3002            path: identifiers
3003                .iter()
3004                .map(|identifier| identifier.value.clone())
3005                .collect(),
3006        },
3007        SqlExpr::Nested(expression) => sqlparser_expression_to_public(expression)?,
3008        SqlExpr::IsNull(expression) => Expr::IsNull {
3009            expression: Box::new(sqlparser_expression_to_public(expression)?),
3010            negated: false,
3011        },
3012        SqlExpr::IsNotNull(expression) => Expr::IsNull {
3013            expression: Box::new(sqlparser_expression_to_public(expression)?),
3014            negated: true,
3015        },
3016        SqlExpr::BinaryOp { left, op, right } => Expr::Binary {
3017            left: Box::new(sqlparser_expression_to_public(left)?),
3018            op: match op {
3019                SqlOperator::Eq => BinaryOperator::Eq,
3020                SqlOperator::NotEq => BinaryOperator::NotEq,
3021                SqlOperator::Gt => BinaryOperator::Gt,
3022                SqlOperator::GtEq => BinaryOperator::GtEq,
3023                SqlOperator::Lt => BinaryOperator::Lt,
3024                SqlOperator::LtEq => BinaryOperator::LtEq,
3025                SqlOperator::And => BinaryOperator::And,
3026                SqlOperator::Or => BinaryOperator::Or,
3027                SqlOperator::Plus => BinaryOperator::Plus,
3028                SqlOperator::Minus => BinaryOperator::Minus,
3029                SqlOperator::Multiply => BinaryOperator::Multiply,
3030                SqlOperator::Divide => BinaryOperator::Divide,
3031                other => {
3032                    return Err(SqlError::Unsupported {
3033                        feature: format!("public expression operator {other}"),
3034                    });
3035                }
3036            },
3037            right: Box::new(sqlparser_expression_to_public(right)?),
3038        },
3039        SqlExpr::Value(value) => Expr::Literal {
3040            value: match &value.value {
3041                Value::Null => ScalarValue::Null,
3042                Value::Boolean(value) => ScalarValue::Boolean(*value),
3043                Value::SingleQuotedString(value) => ScalarValue::Utf8(value.clone()),
3044                Value::Number(value, _)
3045                    if value.contains('.') || value.contains('e') || value.contains('E') =>
3046                {
3047                    ScalarValue::float64(value.parse::<f64>().map_err(|error| {
3048                        SqlError::Unsupported {
3049                            feature: format!("numeric expression literal: {error}"),
3050                        }
3051                    })?)
3052                }
3053                Value::Number(value, _) => {
3054                    ScalarValue::Int64(value.parse::<i64>().map_err(|error| {
3055                        SqlError::Unsupported {
3056                            feature: format!("integer expression literal: {error}"),
3057                        }
3058                    })?)
3059                }
3060                other => {
3061                    return Err(SqlError::Unsupported {
3062                        feature: format!("public expression literal {other}"),
3063                    });
3064                }
3065            },
3066        },
3067        other => {
3068            return Err(SqlError::Unsupported {
3069                feature: format!("public expression node {other}"),
3070            });
3071        }
3072    })
3073}
3074
3075fn public_data_type_to_arrow(
3076    data_type: &krishiv_plan::expression::ExprDataType,
3077) -> arrow::datatypes::DataType {
3078    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
3079    use krishiv_plan::expression::{ExprDataType, IntervalUnit as PublicIntervalUnit};
3080
3081    match data_type {
3082        ExprDataType::Null => DataType::Null,
3083        ExprDataType::Boolean => DataType::Boolean,
3084        ExprDataType::Int64 => DataType::Int64,
3085        ExprDataType::UInt64 => DataType::UInt64,
3086        ExprDataType::Float64 => DataType::Float64,
3087        ExprDataType::Utf8 => DataType::Utf8,
3088        ExprDataType::Binary => DataType::Binary,
3089        ExprDataType::Decimal128 { precision, scale } => DataType::Decimal128(*precision, *scale),
3090        ExprDataType::Date32 => DataType::Date32,
3091        ExprDataType::Timestamp { unit, timezone } => DataType::Timestamp(
3092            match unit {
3093                krishiv_plan::expression::TimeUnit::Second => TimeUnit::Second,
3094                krishiv_plan::expression::TimeUnit::Millisecond => TimeUnit::Millisecond,
3095                krishiv_plan::expression::TimeUnit::Microsecond => TimeUnit::Microsecond,
3096                krishiv_plan::expression::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
3097            },
3098            timezone.clone().map(Into::into),
3099        ),
3100        ExprDataType::Interval { unit } => DataType::Interval(match unit {
3101            PublicIntervalUnit::YearMonth => IntervalUnit::YearMonth,
3102            PublicIntervalUnit::DayTime => IntervalUnit::DayTime,
3103            PublicIntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
3104        }),
3105        ExprDataType::List(element) => DataType::List(Arc::new(Field::new(
3106            "item",
3107            public_data_type_to_arrow(element),
3108            true,
3109        ))),
3110        ExprDataType::Map { key, value } => DataType::Map(
3111            Arc::new(Field::new(
3112                "entries",
3113                DataType::Struct(
3114                    vec![
3115                        Arc::new(Field::new("key", public_data_type_to_arrow(key), false)),
3116                        Arc::new(Field::new("value", public_data_type_to_arrow(value), true)),
3117                    ]
3118                    .into(),
3119                ),
3120                false,
3121            )),
3122            false,
3123        ),
3124        ExprDataType::Struct(fields) => DataType::Struct(
3125            fields
3126                .iter()
3127                .map(|field| {
3128                    Arc::new(Field::new(
3129                        &field.name,
3130                        public_data_type_to_arrow(&field.data_type),
3131                        field.nullable,
3132                    ))
3133                })
3134                .collect::<Vec<_>>()
3135                .into(),
3136        ),
3137        // Variant: stored as JSON-encoded UTF-8 until Arrow gains a
3138        // native variant logical type. Read/write paths use Utf8
3139        // columns and the datafusion engine treats the values as
3140        // opaque strings.
3141        ExprDataType::Variant => DataType::Utf8,
3142    }
3143}
3144
3145fn public_scalar_to_datafusion(
3146    value: &krishiv_plan::expression::ScalarValue,
3147) -> Option<datafusion::common::ScalarValue> {
3148    use datafusion::common::ScalarValue;
3149    use krishiv_plan::expression::{ScalarValue as PublicScalar, TimeUnit};
3150
3151    Some(match value {
3152        PublicScalar::Null => ScalarValue::Null,
3153        PublicScalar::Boolean(value) => ScalarValue::Boolean(Some(*value)),
3154        PublicScalar::Int64(value) => ScalarValue::Int64(Some(*value)),
3155        PublicScalar::UInt64(value) => ScalarValue::UInt64(Some(*value)),
3156        PublicScalar::Float64(bits) => ScalarValue::Float64(Some(f64::from_bits(*bits))),
3157        PublicScalar::Utf8(value) => ScalarValue::Utf8(Some(value.clone())),
3158        PublicScalar::Binary(value) => ScalarValue::Binary(Some(value.clone())),
3159        PublicScalar::Decimal128 {
3160            value,
3161            precision,
3162            scale,
3163        } => ScalarValue::Decimal128(Some(*value), *precision, *scale),
3164        PublicScalar::Date32(value) => ScalarValue::Date32(Some(*value)),
3165        PublicScalar::Timestamp {
3166            value,
3167            unit,
3168            timezone,
3169        } => {
3170            let timezone = timezone.clone().map(Into::into);
3171            match unit {
3172                TimeUnit::Second => ScalarValue::TimestampSecond(Some(*value), timezone),
3173                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(*value), timezone),
3174                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(*value), timezone),
3175                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(*value), timezone),
3176            }
3177        }
3178        PublicScalar::Interval { .. } => return None,
3179    })
3180}
3181
3182/// Lower the versioned engine-owned expression contract into a DataFusion expression.
3183///
3184/// Ordinary nodes are lowered structurally. `RawSql`, generic function calls, aggregate
3185/// calls, and interval literals intentionally use DataFusion's SQL analyzer as the
3186/// compatibility/preview path until those families receive dedicated typed nodes.
3187fn lower_public_expression(
3188    dataframe: &datafusion::dataframe::DataFrame,
3189    expression: &krishiv_plan::expression::Expr,
3190) -> SqlResult<datafusion::logical_expr::Expr> {
3191    expression
3192        .validate()
3193        .map_err(|error| SqlError::Unsupported {
3194            feature: format!("invalid public expression: {error}"),
3195        })?;
3196    use datafusion::logical_expr::{Expr as DataFusionExpr, Operator, binary_expr, cast, try_cast};
3197    use krishiv_plan::expression::{BinaryOperator, Expr};
3198
3199    Ok(match expression {
3200        Expr::Column { path } if path.len() == 1 => {
3201            datafusion::prelude::col(path.first().map(String::as_str).unwrap_or(""))
3202        }
3203        Expr::Column { .. } => parse_dataframe_expression(dataframe, &expression.to_sql())?,
3204        Expr::Literal { value } => match public_scalar_to_datafusion(value) {
3205            Some(value) => DataFusionExpr::Literal(value, None),
3206            None => parse_dataframe_expression(dataframe, &expression.to_sql())?,
3207        },
3208        Expr::Alias { expression, name } => {
3209            lower_public_expression(dataframe, expression)?.alias(name)
3210        }
3211        Expr::Binary { left, op, right } => binary_expr(
3212            lower_public_expression(dataframe, left)?,
3213            match op {
3214                BinaryOperator::Eq => Operator::Eq,
3215                BinaryOperator::NotEq => Operator::NotEq,
3216                BinaryOperator::Gt => Operator::Gt,
3217                BinaryOperator::GtEq => Operator::GtEq,
3218                BinaryOperator::Lt => Operator::Lt,
3219                BinaryOperator::LtEq => Operator::LtEq,
3220                BinaryOperator::And => Operator::And,
3221                BinaryOperator::Or => Operator::Or,
3222                BinaryOperator::Plus => Operator::Plus,
3223                BinaryOperator::Minus => Operator::Minus,
3224                BinaryOperator::Multiply => Operator::Multiply,
3225                BinaryOperator::Divide => Operator::Divide,
3226            },
3227            lower_public_expression(dataframe, right)?,
3228        ),
3229        Expr::IsNull {
3230            expression,
3231            negated,
3232        } => {
3233            let expression = lower_public_expression(dataframe, expression)?;
3234            if *negated {
3235                expression.is_not_null()
3236            } else {
3237                expression.is_null()
3238            }
3239        }
3240        Expr::Cast {
3241            expression,
3242            data_type,
3243            safe,
3244        } => {
3245            let expression = lower_public_expression(dataframe, expression)?;
3246            let data_type = public_data_type_to_arrow(data_type);
3247            if *safe {
3248                try_cast(expression, data_type)
3249            } else {
3250                cast(expression, data_type)
3251            }
3252        }
3253        Expr::Sort { .. } => {
3254            return Err(SqlError::Unsupported {
3255                feature: "standalone sort expressions are only valid inside windows or order_by"
3256                    .into(),
3257            });
3258        }
3259        Expr::Aggregate { .. }
3260        | Expr::Function { .. }
3261        | Expr::Window { .. }
3262        | Expr::RawSql { .. } => parse_dataframe_expression(dataframe, &expression.to_sql())?,
3263    })
3264}
3265
3266fn sql_dataframe<'a>(
3267    dataframe: &'a dyn KrishivDataFrameOps,
3268    operation: &str,
3269) -> SqlResult<&'a SqlDataFrame> {
3270    dataframe
3271        .as_any()
3272        .downcast_ref::<SqlDataFrame>()
3273        .ok_or_else(|| SqlError::DataFusion {
3274            message: format!("right DataFrame must be SqlDataFrame for {operation}"),
3275        })
3276}
3277
3278#[async_trait::async_trait]
3279impl KrishivDataFrameOps for SqlDataFrame {
3280    async fn collect(&self) -> SqlResult<Vec<RecordBatch>> {
3281        SqlDataFrame::collect(self).await
3282    }
3283    async fn collect_with_stats(&self) -> SqlResult<(Vec<RecordBatch>, SqlExecutionStats)> {
3284        SqlDataFrame::collect_with_stats(self).await
3285    }
3286    async fn explain(&self) -> SqlResult<String> {
3287        SqlDataFrame::explain(self).await
3288    }
3289    fn explain_logical(&self) -> String {
3290        SqlDataFrame::explain_logical(self)
3291    }
3292    fn krishiv_logical_plan(&self) -> LogicalPlan {
3293        let label = self.dataframe.logical_plan().to_string();
3294        let mut plan = LogicalPlan::new(self.name.clone(), ExecutionKind::Batch).with_node(
3295            PlanNode::new("datafusion-logical", label, ExecutionKind::Batch),
3296        );
3297        if let Some(n) = self.shuffle_partitions {
3298            plan = plan.with_shuffle_partitions(Some(n));
3299        }
3300        plan
3301    }
3302    fn query(&self) -> Option<&str> {
3303        SqlDataFrame::query(self)
3304    }
3305    async fn execute_stream(&self) -> SqlResult<SqlStream> {
3306        SqlDataFrame::execute_stream(self).await
3307    }
3308
3309    // ── DataFrame transforms ────────────────────────────────────────────────
3310
3311    fn schema(&self) -> SchemaRef {
3312        SchemaRef::from(self.dataframe.schema().clone())
3313    }
3314
3315    async fn select(&self, columns: &[&str]) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3316        let df = self.dataframe.clone().select_columns(columns)?;
3317        Ok(Box::new(self.with_new_dataframe(df, "select")))
3318    }
3319
3320    async fn select_exprs(
3321        &self,
3322        expressions: &[&krishiv_plan::expression::Expr],
3323    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3324        let expressions = expressions
3325            .iter()
3326            .map(|expression| lower_public_expression(&self.dataframe, expression))
3327            .collect::<Result<Vec<_>, _>>()?;
3328        let df = self.dataframe.clone().select(expressions)?;
3329        Ok(Box::new(self.with_new_dataframe(df, "select_exprs")))
3330    }
3331
3332    async fn aggregate(
3333        &self,
3334        group_exprs: &[&krishiv_plan::expression::Expr],
3335        aggregate_exprs: &[&krishiv_plan::expression::Expr],
3336    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3337        if aggregate_exprs.is_empty() {
3338            return Err(SqlError::Unsupported {
3339                feature: "aggregate requires at least one aggregate expression".into(),
3340            });
3341        }
3342        let group_exprs = group_exprs
3343            .iter()
3344            .map(|expression| lower_public_expression(&self.dataframe, expression))
3345            .collect::<Result<Vec<_>, _>>()?;
3346        let aggregate_exprs = aggregate_exprs
3347            .iter()
3348            .map(|expression| lower_public_expression(&self.dataframe, expression))
3349            .collect::<Result<Vec<_>, _>>()?;
3350        let df = self
3351            .dataframe
3352            .clone()
3353            .aggregate(group_exprs, aggregate_exprs)?;
3354        Ok(Box::new(self.with_new_dataframe(df, "aggregate")))
3355    }
3356
3357    async fn aggregate_grouping(
3358        &self,
3359        grouping: GroupingMode<'_>,
3360        aggregate_exprs: &[&krishiv_plan::expression::Expr],
3361    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3362        if aggregate_exprs.is_empty() {
3363            return Err(SqlError::Unsupported {
3364                feature: "grouping aggregation requires at least one aggregate expression".into(),
3365            });
3366        }
3367        let lower = |expression: &&krishiv_plan::expression::Expr| {
3368            lower_public_expression(&self.dataframe, expression)
3369        };
3370        let group = match grouping {
3371            GroupingMode::Sets(sets) => datafusion::logical_expr::grouping_set(
3372                sets.into_iter()
3373                    .map(|set| set.iter().map(lower).collect::<Result<Vec<_>, _>>())
3374                    .collect::<Result<Vec<_>, _>>()?,
3375            ),
3376            GroupingMode::Cube(expressions) => datafusion::logical_expr::cube(
3377                expressions
3378                    .iter()
3379                    .map(lower)
3380                    .collect::<Result<Vec<_>, _>>()?,
3381            ),
3382            GroupingMode::Rollup(expressions) => datafusion::logical_expr::rollup(
3383                expressions
3384                    .iter()
3385                    .map(lower)
3386                    .collect::<Result<Vec<_>, _>>()?,
3387            ),
3388        };
3389        let aggregates = aggregate_exprs
3390            .iter()
3391            .map(lower)
3392            .collect::<Result<Vec<_>, _>>()?;
3393        let df = self.dataframe.clone().aggregate(vec![group], aggregates)?;
3394        Ok(Box::new(self.with_new_dataframe(df, "aggregate_grouping")))
3395    }
3396
3397    async fn pivot(
3398        &self,
3399        group_exprs: &[&krishiv_plan::expression::Expr],
3400        pivot_column: &krishiv_plan::expression::Expr,
3401        aggregate_expr: &krishiv_plan::expression::Expr,
3402        values: &[(krishiv_plan::expression::ScalarValue, String)],
3403    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3404        use krishiv_plan::expression::Expr as PublicExpr;
3405        let (function, input, distinct) = match aggregate_expr {
3406            PublicExpr::Aggregate {
3407                function,
3408                expression: Some(input),
3409                distinct,
3410            } => (*function, input.as_ref(), *distinct),
3411            _ => {
3412                return Err(SqlError::Unsupported {
3413                    feature: "pivot requires an aggregate expression with one input".into(),
3414                });
3415            }
3416        };
3417        if values.is_empty() {
3418            return Err(SqlError::Unsupported {
3419                feature: "pivot requires at least one value".into(),
3420            });
3421        }
3422        let group_exprs = group_exprs
3423            .iter()
3424            .map(|expression| lower_public_expression(&self.dataframe, expression))
3425            .collect::<Result<Vec<_>, _>>()?;
3426        let aggregates = values
3427            .iter()
3428            .map(|(value, alias)| {
3429                let conditional = PublicExpr::raw(format!(
3430                    "CASE WHEN {} = {} THEN {} END",
3431                    pivot_column.to_sql(),
3432                    value.to_sql_literal(),
3433                    input.to_sql()
3434                ));
3435                let aggregate = PublicExpr::Aggregate {
3436                    function,
3437                    expression: Some(Box::new(conditional)),
3438                    distinct,
3439                }
3440                .alias(alias);
3441                lower_public_expression(&self.dataframe, &aggregate)
3442            })
3443            .collect::<Result<Vec<_>, _>>()?;
3444        let dataframe = self.dataframe.clone().aggregate(group_exprs, aggregates)?;
3445        Ok(Box::new(self.with_new_dataframe(dataframe, "pivot")))
3446    }
3447
3448    async fn unpivot(
3449        &self,
3450        columns: &[&str],
3451        name_column: &str,
3452        value_column: &str,
3453    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3454        if columns.is_empty() {
3455            return Err(SqlError::Unsupported {
3456                feature: "unpivot requires at least one column".into(),
3457            });
3458        }
3459        let retained = self
3460            .dataframe
3461            .schema()
3462            .fields()
3463            .iter()
3464            .map(|field| field.name().as_str())
3465            .filter(|name| !columns.contains(name))
3466            .collect::<Vec<_>>();
3467        let mut branches = Vec::with_capacity(columns.len());
3468        for column in columns {
3469            let mut expressions = retained
3470                .iter()
3471                .map(|name| datafusion::logical_expr::col(*name))
3472                .collect::<Vec<_>>();
3473            expressions
3474                .push(datafusion::logical_expr::lit((*column).to_owned()).alias(name_column));
3475            expressions.push(datafusion::logical_expr::col(*column).alias(value_column));
3476            branches.push(self.dataframe.clone().select(expressions)?);
3477        }
3478        let mut branches = branches.into_iter();
3479        let Some(mut dataframe) = branches.next() else {
3480            return Err(SqlError::Unsupported {
3481                feature: "unpivot requires at least one branch".into(),
3482            });
3483        };
3484        for branch in branches {
3485            dataframe = dataframe.union(branch)?;
3486        }
3487        Ok(Box::new(self.with_new_dataframe(dataframe, "unpivot")))
3488    }
3489
3490    async fn filter(&self, predicate: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3491        let expr = self.dataframe.parse_sql_expr(predicate)?;
3492        let df = self.dataframe.clone().filter(expr)?;
3493        Ok(Box::new(self.with_new_dataframe(df, "filter")))
3494    }
3495
3496    async fn filter_expr(
3497        &self,
3498        predicate: &krishiv_plan::expression::Expr,
3499    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3500        let expr = lower_public_expression(&self.dataframe, predicate)?;
3501        let df = self.dataframe.clone().filter(expr)?;
3502        Ok(Box::new(self.with_new_dataframe(df, "filter_expr")))
3503    }
3504
3505    async fn limit(&self, n: usize) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3506        let df = self.dataframe.clone().limit(0, Some(n))?;
3507        Ok(Box::new(self.with_new_dataframe(df, "limit")))
3508    }
3509
3510    async fn distinct(&self) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3511        let df = self.dataframe.clone().distinct()?;
3512        Ok(Box::new(self.with_new_dataframe(df, "distinct")))
3513    }
3514
3515    async fn drop_nulls(&self, columns: &[&str]) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3516        let columns = if columns.is_empty() {
3517            self.dataframe
3518                .schema()
3519                .fields()
3520                .iter()
3521                .map(|field| field.name().as_str())
3522                .collect::<Vec<_>>()
3523        } else {
3524            columns.to_vec()
3525        };
3526        let mut predicate: Option<datafusion::logical_expr::Expr> = None;
3527        for column in columns {
3528            let next = datafusion::logical_expr::col(column).is_not_null();
3529            predicate = Some(match predicate {
3530                Some(current) => current.and(next),
3531                None => next,
3532            });
3533        }
3534        let df = match predicate {
3535            Some(predicate) => self.dataframe.clone().filter(predicate)?,
3536            None => self.dataframe.clone(),
3537        };
3538        Ok(Box::new(self.with_new_dataframe(df, "drop_nulls")))
3539    }
3540
3541    async fn sample(&self, fraction: f64) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3542        if !(0.0..=1.0).contains(&fraction) {
3543            return Err(SqlError::Unsupported {
3544                feature: "sample fraction must be between 0 and 1".into(),
3545            });
3546        }
3547        let predicate = self
3548            .dataframe
3549            .parse_sql_expr(&format!("random() < {fraction}"))?;
3550        let df = self.dataframe.clone().filter(predicate)?;
3551        Ok(Box::new(self.with_new_dataframe(df, "sample")))
3552    }
3553
3554    async fn sort(
3555        &self,
3556        columns: &[&str],
3557        descending: &[bool],
3558    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3559        use datafusion::logical_expr::SortExpr;
3560        let exprs: Vec<SortExpr> = columns
3561            .iter()
3562            .zip(descending.iter())
3563            .map(|(col_name, desc)| datafusion::logical_expr::col(*col_name).sort(!desc, *desc))
3564            .collect();
3565        let df = self.dataframe.clone().sort(exprs)?;
3566        Ok(Box::new(self.with_new_dataframe(df, "sort")))
3567    }
3568
3569    async fn alias(&self, alias: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3570        let df = self.dataframe.clone().alias(alias)?;
3571        Ok(Box::new(self.with_new_dataframe(df, "alias")))
3572    }
3573
3574    async fn drop_columns(&self, columns: &[&str]) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3575        let df = self.dataframe.clone().drop_columns(columns)?;
3576        Ok(Box::new(self.with_new_dataframe(df, "drop")))
3577    }
3578
3579    async fn rename_column(&self, old: &str, new: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3580        let df = self.dataframe.clone().with_column_renamed(old, new)?;
3581        Ok(Box::new(self.with_new_dataframe(df, "rename")))
3582    }
3583
3584    async fn with_column(&self, name: &str, expr: &str) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3585        let parsed = self.dataframe.parse_sql_expr(expr)?;
3586        let df = self.dataframe.clone().with_column(name, parsed)?;
3587        Ok(Box::new(self.with_new_dataframe(df, "with_column")))
3588    }
3589
3590    fn as_any(&self) -> &dyn std::any::Any {
3591        self
3592    }
3593
3594    async fn describe(&self) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3595        let df = self.dataframe.clone().describe().await?;
3596        Ok(Box::new(self.with_new_dataframe(df, "describe")))
3597    }
3598
3599    async fn fill_null(
3600        &self,
3601        column: &str,
3602        value: &str,
3603    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3604        let expr = format!("COALESCE({column}, {value})");
3605        let parsed = self.dataframe.parse_sql_expr(&expr)?;
3606        let df = self.dataframe.clone().with_column(column, parsed)?;
3607        Ok(Box::new(self.with_new_dataframe(df, "fill_null")))
3608    }
3609
3610    async fn join(
3611        &self,
3612        right: &dyn KrishivDataFrameOps,
3613        how: &str,
3614        left_on: &[&str],
3615        right_on: &[&str],
3616    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3617        let right_sql = right
3618            .as_any()
3619            .downcast_ref::<SqlDataFrame>()
3620            .ok_or_else(|| SqlError::DataFusion {
3621                message: "right DataFrame must be SqlDataFrame for join".into(),
3622            })?;
3623        use datafusion::common::JoinType;
3624        let join_type = match how.to_lowercase().as_str() {
3625            "inner" => JoinType::Inner,
3626            "left" => JoinType::Left,
3627            "right" => JoinType::Right,
3628            "full" | "outer" => JoinType::Full,
3629            "leftsemi" | "left_semi" => JoinType::LeftSemi,
3630            "rightsemi" | "right_semi" => JoinType::RightSemi,
3631            "leftanti" | "left_anti" => JoinType::LeftAnti,
3632            "rightanti" | "right_anti" => JoinType::RightAnti,
3633            _ => {
3634                return Err(SqlError::DataFusion {
3635                    message: format!("unsupported join type: {how}"),
3636                });
3637            }
3638        };
3639        let df = self.dataframe.clone().join(
3640            right_sql.dataframe.clone(),
3641            join_type,
3642            left_on,
3643            right_on,
3644            None,
3645        )?;
3646        Ok(Box::new(self.with_new_dataframe(df, "join")))
3647    }
3648
3649    async fn union(
3650        &self,
3651        right: &dyn KrishivDataFrameOps,
3652    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3653        let right_sql = right
3654            .as_any()
3655            .downcast_ref::<SqlDataFrame>()
3656            .ok_or_else(|| SqlError::DataFusion {
3657                message: "right DataFrame must be SqlDataFrame for union".into(),
3658            })?;
3659        let df = self.dataframe.clone().union(right_sql.dataframe.clone())?;
3660        Ok(Box::new(self.with_new_dataframe(df, "union")))
3661    }
3662
3663    async fn union_distinct(
3664        &self,
3665        right: &dyn KrishivDataFrameOps,
3666    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3667        let right = sql_dataframe(right, "union_distinct")?;
3668        let df = self
3669            .dataframe
3670            .clone()
3671            .union_distinct(right.dataframe.clone())?;
3672        Ok(Box::new(self.with_new_dataframe(df, "union_distinct")))
3673    }
3674
3675    async fn intersect(
3676        &self,
3677        right: &dyn KrishivDataFrameOps,
3678        distinct: bool,
3679    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3680        let right = sql_dataframe(right, "intersect")?;
3681        let df = if distinct {
3682            self.dataframe
3683                .clone()
3684                .intersect_distinct(right.dataframe.clone())?
3685        } else {
3686            self.dataframe.clone().intersect(right.dataframe.clone())?
3687        };
3688        Ok(Box::new(self.with_new_dataframe(df, "intersect")))
3689    }
3690
3691    async fn except(
3692        &self,
3693        right: &dyn KrishivDataFrameOps,
3694        distinct: bool,
3695    ) -> SqlResult<Box<dyn KrishivDataFrameOps>> {
3696        let right = sql_dataframe(right, "except")?;
3697        let df = if distinct {
3698            self.dataframe
3699                .clone()
3700                .except_distinct(right.dataframe.clone())?
3701        } else {
3702            self.dataframe.clone().except(right.dataframe.clone())?
3703        };
3704        Ok(Box::new(self.with_new_dataframe(df, "except")))
3705    }
3706
3707    async fn register_batches(&self, name: &str, batches: Vec<RecordBatch>) -> SqlResult<()> {
3708        let schema = batches
3709            .first()
3710            .map(|b| b.schema())
3711            .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
3712        let mem_table =
3713            datafusion::datasource::MemTable::try_new(schema, vec![batches]).map_err(|e| {
3714                SqlError::DataFusion {
3715                    message: e.to_string(),
3716                }
3717            })?;
3718        self.context
3719            .register_table(name, Arc::new(mem_table))
3720            .map_err(SqlError::from)?;
3721        Ok(())
3722    }
3723
3724    async fn deregister_table(&self, name: &str) -> SqlResult<()> {
3725        let _ = self
3726            .context
3727            .deregister_table(name)
3728            .map_err(SqlError::from)?;
3729        Ok(())
3730    }
3731
3732    async fn create_view(&self, name: &str, replace: bool) -> SqlResult<()> {
3733        let query = self
3734            .query_text
3735            .as_deref()
3736            .ok_or_else(|| SqlError::DataFusion {
3737                message: "create_view requires an SQL query string on the DataFrame".into(),
3738            })?;
3739        let or_replace = if replace { "OR REPLACE " } else { "" };
3740        let safe_name = quote_identifier(name);
3741        let view_sql = format!("CREATE {or_replace}VIEW {safe_name} AS {query}");
3742        self.context.sql(&view_sql).await?;
3743        Ok(())
3744    }
3745}
3746
3747use krishiv_common::sql_util::quote_identifier;
3748
3749// ── CALL-system helpers ───────────────────────────────────────────────────────
3750
3751/// Extract positional arguments from the body of a `CALL` statement.
3752///
3753/// Handles single-quoted string literals and bare integers.
3754/// `'catalog.ns.table', '7 days', 5` → `["catalog.ns.table", "7 days", "5"]`
3755#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
3756fn call_args_from_str(s: &str) -> Vec<String> {
3757    let mut args: Vec<String> = Vec::new();
3758    let mut cur = String::new();
3759    let mut in_str = false;
3760    let mut after_str = false;
3761    for ch in s.chars() {
3762        if after_str {
3763            if ch == ',' {
3764                after_str = false;
3765            }
3766            continue;
3767        }
3768        if in_str {
3769            if ch == '\'' {
3770                in_str = false;
3771                after_str = true;
3772                args.push(std::mem::take(&mut cur));
3773            } else {
3774                cur.push(ch);
3775            }
3776        } else if ch == '\'' {
3777            in_str = true;
3778        } else if ch == ',' {
3779            let t = cur.trim().to_string();
3780            if !t.is_empty() {
3781                args.push(t);
3782            }
3783            cur.clear();
3784        } else {
3785            cur.push(ch);
3786        }
3787    }
3788    let t = cur.trim().to_string();
3789    if !t.is_empty() {
3790        args.push(t);
3791    }
3792    args
3793}
3794
3795/// Parse an Iceberg `TableIdent` from a dotted string.
3796///
3797/// Accepts:
3798/// - `"namespace.table"` — single-level namespace
3799/// - `"catalog.namespace.table"` — catalog prefix is ignored (catalog is
3800///   selected by registration order, not by name, in the CALL dispatch)
3801#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
3802fn iceberg_table_ident(table_ref: &str) -> SqlResult<iceberg::TableIdent> {
3803    let parts: Vec<&str> = table_ref.splitn(3, '.').collect();
3804    match parts.len() {
3805        2 => {
3806            let ns = iceberg::NamespaceIdent::from_vec(vec![
3807                parts.first().copied().unwrap_or("").to_string(),
3808            ])
3809            .map_err(|e| SqlError::DataFusion {
3810                message: e.to_string(),
3811            })?;
3812            Ok(iceberg::TableIdent::new(
3813                ns,
3814                parts.get(1).copied().unwrap_or("").to_string(),
3815            ))
3816        }
3817        3 => {
3818            let ns = iceberg::NamespaceIdent::from_vec(vec![
3819                parts.get(1).copied().unwrap_or("").to_string(),
3820            ])
3821            .map_err(|e| SqlError::DataFusion {
3822                message: e.to_string(),
3823            })?;
3824            Ok(iceberg::TableIdent::new(
3825                ns,
3826                parts.get(2).copied().unwrap_or("").to_string(),
3827            ))
3828        }
3829        _ => Err(SqlError::DataFusion {
3830            message: format!(
3831                "invalid table reference '{table_ref}': expected 'ns.table' or 'cat.ns.table'"
3832            ),
3833        }),
3834    }
3835}
3836
3837/// Parse a human-readable duration string into a [`chrono::Duration`].
3838///
3839/// Accepted formats: `"N days"`, `"N day"`, `"N hours"`, `"N hour"`,
3840/// `"N weeks"`, `"N week"`, `"N minutes"`, `"N minute"`.
3841#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
3842fn parse_call_duration(s: &str) -> SqlResult<chrono::Duration> {
3843    let s = s.trim();
3844    let mut it = s.splitn(2, ' ');
3845    let n: i64 = it
3846        .next()
3847        .and_then(|v| v.parse().ok())
3848        .ok_or_else(|| SqlError::DataFusion {
3849            message: format!("invalid duration value in '{s}'"),
3850        })?;
3851    let unit = it.next().unwrap_or("").trim().to_ascii_lowercase();
3852    match unit.trim_end_matches('s') {
3853        "day" => Ok(chrono::Duration::days(n)),
3854        "hour" => Ok(chrono::Duration::hours(n)),
3855        "week" => Ok(chrono::Duration::weeks(n)),
3856        "minute" | "min" => Ok(chrono::Duration::minutes(n)),
3857        _ => Err(SqlError::DataFusion {
3858            message: format!("unknown duration unit '{unit}' in '{s}'"),
3859        }),
3860    }
3861}
3862
3863// ── Iceberg DML helpers ───────────────────────────────────────────────────────
3864
3865/// Parse `DELETE FROM <table> [WHERE <predicate>]` into `(table_ref, predicate)`
3866/// using the sqlparser AST, which correctly handles quoted identifiers, comments,
3867/// and subqueries in predicates.  Returns `None` for non-DELETE statements.
3868///
3869/// A missing WHERE clause is returned as `"TRUE"` (delete all rows).
3870#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
3871fn parse_dml_delete(stmt: &str) -> Option<(String, String)> {
3872    use datafusion::sql::sqlparser::ast::{FromTable, Statement, TableFactor};
3873    use datafusion::sql::sqlparser::dialect::GenericDialect;
3874    use datafusion::sql::sqlparser::parser::Parser;
3875
3876    let mut stmts = Parser::parse_sql(&GenericDialect {}, stmt).ok()?;
3877    if stmts.len() != 1 {
3878        return None;
3879    }
3880    let Statement::Delete(delete) = stmts.remove(0) else {
3881        return None;
3882    };
3883    // `Delete::from` is a `FromTable` enum (sqlparser ≥0.54); both arms carry the
3884    // table list. The first FROM table is the deletion target.
3885    let tables = match delete.from {
3886        FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
3887    };
3888    let first_from = tables.into_iter().next()?;
3889    let table_name = match first_from.relation {
3890        TableFactor::Table { name, .. } => name.to_string(),
3891        _ => return None,
3892    };
3893    let predicate = delete
3894        .selection
3895        .map(|e| e.to_string())
3896        .unwrap_or_else(|| "TRUE".to_string());
3897    Some((table_name, predicate))
3898}
3899
3900/// Parsed UPDATE statement, decomposed into its components for Iceberg DML.
3901#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
3902struct ParsedUpdate {
3903    table_ref: String,
3904    /// Ordered (column_name, value_expression) pairs from the SET clause.
3905    assignments: Vec<(String, String)>,
3906    predicate: Option<String>,
3907}
3908
3909/// Parse `UPDATE <table> SET col = expr [, …] [WHERE <predicate>]` using the
3910/// sqlparser AST.  Returns `None` for non-UPDATE statements or unsupported shapes.
3911///
3912/// Replaces the former regex implementation which could not handle quoted
3913/// identifiers, expressions with commas, or subqueries in predicates.
3914#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
3915fn parse_dml_update(stmt: &str) -> Option<ParsedUpdate> {
3916    use datafusion::sql::sqlparser::ast::{Statement, TableFactor};
3917    use datafusion::sql::sqlparser::dialect::GenericDialect;
3918    use datafusion::sql::sqlparser::parser::Parser;
3919
3920    let mut stmts = Parser::parse_sql(&GenericDialect {}, stmt).ok()?;
3921    if stmts.len() != 1 {
3922        return None;
3923    }
3924    // `Statement::Update` wraps an `Update` struct (sqlparser ≥0.55).
3925    let Statement::Update(update) = stmts.remove(0) else {
3926        return None;
3927    };
3928    let table_name = match update.table.relation {
3929        TableFactor::Table { name, .. } => name.to_string(),
3930        _ => return None,
3931    };
3932    // Convert AST assignments to (column_name, expression_string) pairs.
3933    let parsed_assignments: Vec<(String, String)> = update
3934        .assignments
3935        .into_iter()
3936        .map(|a| {
3937            // `target` is `AssignmentTarget::ColumnName(ObjectName)` in 0.61.
3938            let col = a.target.to_string();
3939            let val = a.value.to_string();
3940            (col, val)
3941        })
3942        .collect();
3943    if parsed_assignments.is_empty() {
3944        return None;
3945    }
3946    Some(ParsedUpdate {
3947        table_ref: table_name,
3948        assignments: parsed_assignments,
3949        predicate: update.selection.map(|e| e.to_string()),
3950    })
3951}
3952
3953/// Create a Krishiv logical plan wrapper for a SQL query without executing it.
3954pub fn plan_sql(query: impl Into<String>) -> SqlResult<SqlPlan> {
3955    let query = query.into();
3956    if query.trim().is_empty() {
3957        return Err(SqlError::EmptyQuery);
3958    }
3959
3960    if let Some(stmt) = cep_sql::parse_match_recognize(&query)? {
3961        let logical_plan = cep_sql::plan_match_recognize(stmt, &query);
3962        let optimized = Optimizer::default().optimize(logical_plan)?;
3963        return Ok(SqlPlan {
3964            query,
3965            logical_plan: optimized.plan,
3966        });
3967    }
3968
3969    let logical_plan =
3970        LogicalPlan::new("sql-query", ExecutionKind::Batch).with_node(PlanNode::new(
3971            "sql",
3972            format!("sql: {}", query.trim()),
3973            ExecutionKind::Batch,
3974        ));
3975
3976    let optimized = Optimizer::default().optimize(logical_plan)?;
3977    Ok(SqlPlan {
3978        query,
3979        logical_plan: optimized.plan,
3980    })
3981}
3982
3983/// Create bootstrap `EXPLAIN` text for a SQL query.
3984pub fn explain_sql(query: impl Into<String>) -> SqlResult<String> {
3985    let plan = plan_sql(query)?;
3986    Ok(plan.logical_plan().describe())
3987}
3988
3989/// Explain a SQL query including optimizer rule decisions.
3990///
3991/// Runs the logical plan through `optimizer` and appends the optimizer
3992/// summary to the plan description.
3993pub fn explain_sql_optimized(query: impl Into<String>, optimizer: &Optimizer) -> SqlResult<String> {
3994    let plan = plan_sql(query)?;
3995    let result = optimizer.optimize(plan.logical_plan().clone())?;
3996    let mut output = result.plan.describe();
3997    let optimizer_line = result.describe();
3998    output.push('\n');
3999    output.push_str(&optimizer_line);
4000    Ok(output)
4001}
4002
4003/// Explain a SQL query and append a cost estimate from the provided cost model.
4004pub fn explain_sql_with_cost(
4005    query: impl Into<String>,
4006    cost_model: &dyn CostModel,
4007) -> SqlResult<String> {
4008    let plan = plan_sql(query)?;
4009    let cost = cost_model.estimate(plan.logical_plan());
4010    let mut output = plan.logical_plan().describe();
4011    output.push_str(&format!(
4012        "\ncost: cpu_nanos={}, memory_bytes={}, network_bytes={}",
4013        cost.cpu_nanos, cost.memory_bytes, cost.network_bytes
4014    ));
4015    Ok(output)
4016}
4017
4018/// Return all base table/relation names referenced by `query`.
4019///
4020/// This uses the same SQL parser family as DataFusion, so policy checks cover
4021/// joins, subqueries, CTE bodies, and other nested relation references instead
4022/// of relying on a single best-effort `FROM` token.
4023pub fn referenced_table_names(query: impl AsRef<str>) -> SqlResult<Vec<String>> {
4024    let query = query.as_ref();
4025    if query.trim().is_empty() {
4026        return Err(SqlError::EmptyQuery);
4027    }
4028
4029    let statements =
4030        Parser::parse_sql(&GenericDialect {}, query).map_err(|e| SqlError::DataFusion {
4031            message: format!("SQL parse error: {e}"),
4032        })?;
4033    let mut names = BTreeSet::new();
4034    let _ = visit_relations(&statements, |relation| {
4035        names.insert(relation.to_string());
4036        ControlFlow::<()>::Continue(())
4037    });
4038    Ok(names.into_iter().collect())
4039}
4040
4041/// Format Arrow batches for CLI and tests.
4042pub fn pretty_batches(batches: &[RecordBatch]) -> SqlResult<String> {
4043    Ok(pretty_format_batches(batches)
4044        .map_err(|error| SqlError::DataFusion {
4045            message: error.to_string(),
4046        })?
4047        .to_string())
4048}
4049
4050#[cfg(test)]
4051mod sql_tests;