Skip to main content

uni_plugin/adapter_common/
batch_builder.rs

1//! Single-row [`RecordBatch`] / stream builders shared by loader adapters.
2//!
3//! `uni-plugin-rhai`'s procedure adapter materializes the plugin's
4//! returned rows into one `RecordBatch` and then wraps it in a
5//! `RecordBatchStreamAdapter` yielding exactly one item. The 1-batch
6//! stream pattern is generic and unrelated to rhai. This module hosts
7//! the shared implementation so future adapters (custom in-process
8//! procedures, builtin synthetic catalogs) reuse it.
9//!
10//! These helpers are pure Arrow utilities; they do **not** depend on any
11//! plugin-loader feature gate.
12
13// Rust guideline compliant
14
15use arrow_array::{ArrayRef, RecordBatch};
16use arrow_schema::{ArrowError, SchemaRef};
17use datafusion::execution::SendableRecordBatchStream;
18use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
19use futures::stream;
20
21/// Construct a single-row [`RecordBatch`] from the supplied schema and
22/// column arrays.
23///
24/// Thin wrapper over `RecordBatch::try_new` whose only purpose is to
25/// give shared call sites a named function (so the rhai loader, future
26/// synthetic catalogs, and tests share a vocabulary).
27///
28/// # Errors
29///
30/// Propagates [`ArrowError::SchemaError`] / [`ArrowError::InvalidArgumentError`]
31/// when the supplied columns do not match the schema (column count, type
32/// mismatch, or row-count mismatch).
33pub fn single_row_record_batch(
34    schema: SchemaRef,
35    cols: Vec<ArrayRef>,
36) -> Result<RecordBatch, ArrowError> {
37    RecordBatch::try_new(schema, cols)
38}
39
40/// Wrap a single [`RecordBatch`] in a one-item
41/// [`SendableRecordBatchStream`].
42///
43/// Used by procedure adapters whose plugin produced exactly one batch
44/// (and by tests that need a stream-shaped fixture for a one-row
45/// constant input). The yielded schema is `Arc`-cloned from the
46/// supplied `batch`.
47#[must_use]
48pub fn batch_into_stream(batch: RecordBatch) -> SendableRecordBatchStream {
49    let schema = batch.schema();
50    let stream = stream::iter(vec![Ok(batch)]);
51    Box::pin(RecordBatchStreamAdapter::new(schema, stream))
52}