vgi 0.1.2

Build VGI workers in Rust to extend DuckDB with custom catalogs, functions, and tables over Apache Arrow IPC
Documentation
// Copyright 2025, 2026 Query Farm LLC - https://query.farm

//! Table (producer) function model: generate output batches without input.
//!
//! Mirrors Go `initTable` + `TableProducerState`. The function creates a
//! per-execution [`TableProducer`] whose `produce` is called repeatedly; it
//! emits one batch per tick and calls `out.finish()` when exhausted.

use arrow_schema::SchemaRef;
use vgi_rpc::Result;

use crate::function::{ArgSpec, BindParams, BindResponse, FunctionMetadata, ProcessParams};

/// Cardinality estimate for a table function.
#[derive(Clone, Copy, Default)]
pub struct TableCardinality {
    pub estimate: Option<i64>,
    pub max: Option<i64>,
}

/// A per-execution producer. Holds the function's mutable scan state.
///
/// Returns the next batch, or `None` when the scan is exhausted. The dispatch
/// adapter applies projection / auto-filter pushdown to each batch before
/// emitting, so producers stay free of that concern. `out` is provided only
/// for `client_log` — do NOT emit through it (the adapter emits the returned
/// batch).
pub trait TableProducer: Send {
    fn next_batch(
        &mut self,
        out: &mut vgi_rpc::OutputCollector,
    ) -> Result<Option<arrow_array::RecordBatch>>;
    /// Serialize the producer's in-progress scan position for HTTP continuation
    /// (default empty — producers whose whole result is regenerable from the
    /// shared work queue alone need none). Work-queue producers that span a
    /// popped chunk across multiple batches MUST encode their partial-chunk
    /// cursor here, since the chunk is destructively removed from the queue on
    /// pop and cannot be re-derived on resume.
    fn encode_resume(&self) -> Vec<u8> {
        Vec::new()
    }
    /// Restore the partial-chunk cursor after rebuilding from an HTTP state
    /// token. Inverse of [`encode_resume`](Self::encode_resume).
    fn restore_resume(&mut self, _bytes: &[u8]) {}
    /// Per-batch wire metadata for the batch just returned by `next_batch`
    /// (e.g. `vgi_batch_index` for `supports_batch_index` functions). Default
    /// none. Called once after each `next_batch` that returns `Some`.
    fn last_metadata(&self) -> Option<std::collections::HashMap<String, String>> {
        None
    }
    /// Called before each `next_batch` with the per-tick dynamic pushdown
    /// filters (from the `vgi_pushdown_filters` request metadata), if any. Lets
    /// a producer observe a tightening Top-N filter. Default ignores them.
    fn on_dynamic_filters(&mut self, _filters: Option<&crate::pushdown::PushdownFilters>) {}
}

/// A table (producer) VGI function.
pub trait TableFunction: Send + Sync {
    fn name(&self) -> &str;
    fn metadata(&self) -> FunctionMetadata;
    fn argument_specs(&self) -> Vec<ArgSpec>;
    /// Resolve the output schema from bind-time arguments.
    fn on_bind(&self, params: &BindParams) -> Result<BindResponse>;
    /// Worker parallelism hint (default single worker).
    fn max_workers(&self, _params: &BindParams) -> i64 {
        1
    }
    /// Primary-worker global init: runs once per execution (when DuckDB issues
    /// the init without an execution_id) before any producer. Use it to push
    /// work items onto `params.storage`'s queue for parallel-scan producers.
    /// Secondary workers (init carrying an execution_id) skip it.
    fn on_init(&self, _params: &ProcessParams) -> Result<()> {
        Ok(())
    }
    /// Optional cardinality estimate.
    fn cardinality(&self, _params: &BindParams) -> Option<TableCardinality> {
        None
    }
    /// Optional per-column optimizer statistics for this call.
    fn statistics(&self, _params: &BindParams) -> Option<Vec<crate::statistics::CatColStat>> {
        None
    }
    /// Secret types this function needs (triggers the two-phase secret bind).
    fn secret_lookups(&self, _params: &BindParams) -> Vec<crate::secrets::SecretLookup> {
        Vec::new()
    }
    /// Build the per-execution producer. `params.output_schema` is the
    /// (possibly projection-narrowed) schema to emit.
    fn producer(&self, params: &ProcessParams) -> Result<Box<dyn TableProducer>>;

    /// Post-execution diagnostics surfaced as Extra Info under EXPLAIN ANALYZE.
    /// Reads whatever the producer persisted to `storage` keyed by
    /// `global_execution_id`. Default: no extra info.
    fn dynamic_to_string(
        &self,
        _global_execution_id: &[u8],
        _storage: &crate::buffering::BufferingStore,
    ) -> Vec<(String, String)> {
        Vec::new()
    }
}

/// Narrow a full schema to the projected columns (`projection_ids`).
pub fn project_schema(full: &SchemaRef, ids: &Option<Vec<i64>>) -> SchemaRef {
    match ids {
        Some(ids) if !ids.is_empty() => {
            let fields: Vec<_> = ids
                .iter()
                .filter_map(|&i| full.fields().get(i as usize).cloned())
                .collect();
            std::sync::Arc::new(arrow_schema::Schema::new(fields))
        }
        _ => full.clone(),
    }
}