vgi 0.1.2

Build VGI workers in Rust to extend DuckDB with custom catalogs, functions, and tables over Apache Arrow IPC
Documentation
// Copyright 2025, 2026 Query Farm LLC - https://query.farm

//! Core function model shared by all VGI function kinds.
//!
//! Mirrors the Go `function.go` / `bind.go` / `scalar.go` surface and the
//! canonical Python base classes.

use std::sync::Arc;

use arrow_schema::{DataType, SchemaRef};
use vgi_rpc::Result;

use crate::protocol::enums;

/// A named type-bound predicate for ANY-typed arguments. Checked at bind:
/// the input field type must satisfy the predicate or bind errors with the
/// bound's `name` (mirrors Python's `type_bound=<predicate>`).
#[derive(Clone, Copy)]
pub struct TypeBound {
    pub name: &'static str,
    pub pred: fn(&DataType) -> bool,
}

impl std::fmt::Debug for TypeBound {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "TypeBound({})", self.name)
    }
}

/// `_is_addable_type`: integer | floating | decimal | temporal.
pub const ADDABLE: TypeBound = TypeBound {
    name: "_is_addable_type",
    pred: is_addable,
};
/// `_is_multipliable_type`: integer | floating | decimal (no temporal).
pub const MULTIPLIABLE: TypeBound = TypeBound {
    name: "_is_multipliable_type",
    pred: is_multipliable,
};

fn is_integer(t: &DataType) -> bool {
    use DataType::*;
    matches!(
        t,
        Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
    )
}
fn is_floating(t: &DataType) -> bool {
    matches!(t, DataType::Float16 | DataType::Float32 | DataType::Float64)
}
fn is_decimal(t: &DataType) -> bool {
    matches!(t, DataType::Decimal128(_, _) | DataType::Decimal256(_, _))
}
fn is_temporal(t: &DataType) -> bool {
    use DataType::*;
    matches!(
        t,
        Date32 | Date64 | Time32(_) | Time64(_) | Timestamp(_, _) | Duration(_) | Interval(_)
    )
}
fn is_addable(t: &DataType) -> bool {
    is_integer(t) || is_floating(t) || is_decimal(t) || is_temporal(t)
}
fn is_multipliable(t: &DataType) -> bool {
    is_integer(t) || is_floating(t) || is_decimal(t)
}

/// Per-argument specification, used to build the function's wire arg schema
/// (`FunctionInfo.arguments`) and validate type bounds at bind time.
#[derive(Debug, Clone)]
pub struct ArgSpec {
    /// Argument name (the struct field name; empty for positional-only).
    pub name: String,
    /// 0-based positional index; `-1` for named-only.
    pub position: i32,
    /// VGI arg type string: `"int64"`, `"varchar"`, `"any"`, `"table"`, …
    pub arrow_type: String,
    /// Doc string.
    pub doc: String,
    /// Constant (bind-time scalar) parameter.
    pub is_const: bool,
    /// Variadic parameter.
    pub is_varargs: bool,
    /// Optional concrete Arrow type (takes precedence over `arrow_type`).
    pub arrow_data_type: Option<DataType>,
    /// Optional bind-time type bound for ANY-typed args.
    pub type_bound: Option<TypeBound>,
}

impl ArgSpec {
    fn base(name: &str, position: i32, arrow_type: &str, doc: &str) -> Self {
        ArgSpec {
            name: name.to_string(),
            position,
            arrow_type: arrow_type.to_string(),
            doc: doc.to_string(),
            is_const: false,
            is_varargs: false,
            arrow_data_type: None,
            type_bound: None,
        }
    }

    /// A positional, non-const ANY-typed column argument.
    pub fn any_column(name: &str, position: i32, doc: &str) -> Self {
        Self::base(name, position, "any", doc)
    }

    /// A positional, non-const column argument of a concrete VGI type string
    /// (e.g. `"int32"`, `"varchar"`, `"binary"`).
    pub fn column(name: &str, position: i32, arrow_type: &str, doc: &str) -> Self {
        Self::base(name, position, arrow_type, doc)
    }

    /// A positional column argument with an explicit Arrow type.
    pub fn column_typed(name: &str, position: i32, ty: DataType, doc: &str) -> Self {
        let mut s = Self::base(name, position, "", doc);
        s.arrow_data_type = Some(ty);
        s
    }

    /// A positional const (bind-time scalar) argument of a concrete VGI type.
    pub fn const_arg(name: &str, position: i32, arrow_type: &str, doc: &str) -> Self {
        let mut s = Self::base(name, position, arrow_type, doc);
        s.is_const = true;
        s
    }

    /// A positional const argument with an explicit Arrow type.
    pub fn const_typed(name: &str, position: i32, ty: DataType, doc: &str) -> Self {
        let mut s = Self::base(name, position, "", doc);
        s.is_const = true;
        s.arrow_data_type = Some(ty);
        s
    }

    /// Mark this spec variadic (consumes all remaining columns).
    pub fn varargs(mut self) -> Self {
        self.is_varargs = true;
        self
    }

    /// Mark this spec const.
    pub fn as_const(mut self) -> Self {
        self.is_const = true;
        self
    }

    /// Attach a type bound.
    pub fn with_bound(mut self, bound: TypeBound) -> Self {
        self.type_bound = Some(bound);
        self
    }
}

/// Validate each spec's type bound against the input schema. Errors (value
/// error) naming the failed bound, matching Python's `SchemaValidationError`.
pub fn validate_type_bounds(specs: &[ArgSpec], input_schema: Option<&SchemaRef>) -> Result<()> {
    let Some(schema) = input_schema else {
        return Ok(());
    };
    for spec in specs {
        let Some(bound) = spec.type_bound else {
            continue;
        };
        if spec.position < 0 {
            continue;
        }
        if let Some(field) = schema.fields().get(spec.position as usize) {
            if !(bound.pred)(field.data_type()) {
                return Err(vgi_rpc::RpcError::value_error(format!(
                    "{}: argument {} of type {} does not satisfy {}",
                    bound.name,
                    spec.name,
                    field.data_type(),
                    bound.name
                )));
            }
        }
    }
    Ok(())
}

/// Optimizer- and discovery-facing function metadata (`FunctionInfo`).
#[derive(Debug, Clone)]
pub struct FunctionMetadata {
    pub description: String,
    pub stability: Option<String>,
    pub null_handling: Option<String>,
    pub categories: Vec<String>,
    /// Fixed scalar return type, when not computed dynamically at bind.
    pub return_type: Option<DataType>,
    pub projection_pushdown: bool,
    pub filter_pushdown: bool,
    pub sampling_pushdown: bool,
    /// Worker-side: auto-apply pushed-down filters to emitted batches.
    pub auto_apply_filters: bool,
    pub supports_batch_index: bool,
    pub partition_kind: Option<String>,
    pub order_preservation: Option<String>,
    /// Table-buffering ordering knobs (surfaced in `FunctionInfo`).
    pub sink_order_dependent: bool,
    pub source_order_dependent: bool,
    pub requires_input_batch_index: bool,
    /// Aggregate window / streaming opt-ins.
    pub supports_window: bool,
    pub streaming_partitioned: bool,
    /// Rowid table participates in late-materialization (Top-N → SEMI rewrite).
    pub late_materialization: bool,
    /// Settings the function requires (surfaced in `FunctionInfo`).
    pub required_settings: Vec<String>,
}

impl Default for FunctionMetadata {
    fn default() -> Self {
        FunctionMetadata {
            description: String::new(),
            stability: Some(enums::stability::CONSISTENT.to_string()),
            null_handling: None,
            categories: Vec::new(),
            return_type: None,
            projection_pushdown: false,
            filter_pushdown: false,
            sampling_pushdown: false,
            auto_apply_filters: false,
            supports_batch_index: false,
            partition_kind: None,
            order_preservation: None,
            sink_order_dependent: false,
            source_order_dependent: false,
            requires_input_batch_index: false,
            supports_window: false,
            streaming_partitioned: false,
            late_materialization: false,
            required_settings: Vec::new(),
        }
    }
}

/// Parameters delivered to `on_bind`.
#[derive(Clone, Default)]
pub struct BindParams {
    /// Input table schema (the argument columns for scalar functions).
    pub input_schema: Option<SchemaRef>,
    /// Parsed call arguments (const values + positional types).
    pub arguments: crate::arguments::Arguments,
    /// Parsed session settings.
    pub settings: crate::settings::Settings,
    /// Resolved secrets, when provided in a second-phase bind.
    pub secrets: crate::secrets::Secrets,
    /// Whether resolved secrets were provided.
    pub resolved_secrets_provided: bool,
    /// Authenticated principal name, if any.
    pub auth_principal: Option<String>,
    /// Sealed attach state.
    pub attach_opaque_data: Option<Vec<u8>>,
    /// Sealed transaction state.
    pub transaction_opaque_data: Option<Vec<u8>>,
    /// Cross-process kv/work store (for transaction-scoped caching, etc.).
    pub storage: Option<std::sync::Arc<crate::buffering::BufferingStore>>,
}

/// Result of `on_bind`.
#[derive(Clone)]
pub struct BindResponse {
    pub output_schema: SchemaRef,
    pub opaque_data: Vec<u8>,
}

impl BindResponse {
    /// A single `result` column of `ty` (the canonical scalar bind result).
    pub fn result(ty: DataType) -> Self {
        BindResponse {
            output_schema: Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
                "result", ty, true,
            )])),
            opaque_data: Vec::new(),
        }
    }
}

/// Parameters delivered to `process`.
#[derive(Clone)]
pub struct ProcessParams {
    pub output_schema: SchemaRef,
    pub input_schema: Option<SchemaRef>,
    pub execution_id: Vec<u8>,
    pub init_opaque_data: Vec<u8>,
    /// Parsed call arguments (const values).
    pub arguments: crate::arguments::Arguments,
    /// Parsed session settings.
    pub settings: crate::settings::Settings,
    /// Resolved secrets.
    pub secrets: crate::secrets::Secrets,
    /// Authenticated principal name, if any.
    pub auth_principal: Option<String>,
    /// Projection pushdown: output column indices to emit (None = all).
    pub projection_ids: Option<Vec<i64>>,
    /// Serialized pushdown filters (large_binary), if any.
    pub pushdown_filters: Option<Vec<u8>>,
    /// Side join-keys IPC batches referenced by `join_keys` filters.
    pub join_keys: Vec<Vec<u8>>,
    /// Cross-process work-queue / kv store (for parallel-scan producers).
    pub storage: Option<std::sync::Arc<crate::buffering::BufferingStore>>,
    /// ORDER BY pushdown hints.
    pub order_by_column: Option<String>,
    pub order_by_direction: Option<String>,
    pub order_by_null_order: Option<String>,
    pub order_by_limit: Option<i64>,
    /// TABLESAMPLE pushdown hints.
    pub tablesample_percentage: Option<f64>,
    pub tablesample_seed: Option<i64>,
    /// The (plaintext) attach state for this call, when carried by the request.
    pub attach_opaque_data: Option<Vec<u8>>,
    /// Time-travel `AT (TIMESTAMP|VERSION ...)` clause for this scan, read from
    /// the per-scan bind request carried on the init request. Both `None`
    /// without an AT clause. Function-backed tables read these to time-travel.
    pub at_unit: Option<String>,
    pub at_value: Option<String>,
}

/// A scalar VGI function: 1:1 row mapping, single `result` output column.
pub trait ScalarFunction: Send + Sync {
    fn name(&self) -> &str;
    fn metadata(&self) -> FunctionMetadata;
    fn argument_specs(&self) -> Vec<ArgSpec>;
    /// Secret lookups to request at bind (two-phase secret resolution). When
    /// non-empty and secrets are not yet resolved, `bind` returns these and the
    /// extension re-binds with the resolved values.
    fn secret_lookups(&self, _params: &BindParams) -> Vec<crate::secrets::SecretLookup> {
        Vec::new()
    }
    /// Resolve the output schema. Default: a `result` column whose type is the
    /// metadata `return_type` if fixed, else the first input field's type.
    fn on_bind(&self, params: &BindParams) -> Result<BindResponse> {
        if let Some(ty) = self.metadata().return_type {
            return Ok(BindResponse::result(ty));
        }
        let ty = params
            .input_schema
            .as_ref()
            .and_then(|s| s.fields().first().map(|f| f.data_type().clone()))
            .unwrap_or(DataType::Int64);
        Ok(BindResponse::result(ty))
    }
    /// Transform one input batch into a single-column output batch with the
    /// same row count.
    fn process(
        &self,
        params: &ProcessParams,
        batch: &arrow_array::RecordBatch,
    ) -> Result<arrow_array::RecordBatch>;
}