Skip to main content

uni_plugin_wasm/
adapter_procedure.rs

1//! Procedure adapter — bridges a CM `procedure-plugin` instance to
2//! [`ProcedurePlugin`].
3//!
4//! Port of `uni_plugin_extism::adapter_procedure`. Eagerly collects
5//! the plugin's output IPC stream into a `RecordBatchStreamAdapter`.
6
7// Rust guideline compliant
8
9use std::sync::Arc;
10
11use arrow::array::RecordBatch;
12use arrow_schema::{Field, Schema, SchemaRef};
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::logical_expr::ColumnarValue;
15use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
16use futures::stream;
17use uni_plugin::QName;
18use uni_plugin::adapter_common::arrow_types::argtype_to_arrow;
19use uni_plugin::errors::FnError;
20use uni_plugin::traits::procedure::{ProcedureContext, ProcedurePlugin, ProcedureSignature};
21use uni_plugin_wasm_rt::ipc::{decode_batches, encode_batch};
22
23use crate::adapter_common::{acquire, ipc_to_fn_err};
24use crate::loader::ProcedurePluginInstance;
25use crate::pool::WasmInstancePool;
26
27/// `ProcedurePlugin` adapter wrapping a CM procedure-plugin pool.
28pub struct ComponentProcedure {
29    pool: Arc<WasmInstancePool<ProcedurePluginInstance>>,
30    qname: QName,
31    sig: ProcedureSignature,
32    args_schema: SchemaRef,
33    yields_schema: SchemaRef,
34}
35
36impl std::fmt::Debug for ComponentProcedure {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("ComponentProcedure")
39            .field("qname", &self.qname)
40            .field("signature", &self.sig)
41            .finish_non_exhaustive()
42    }
43}
44
45impl ComponentProcedure {
46    /// Construct a new adapter against the supplied pool.
47    #[must_use]
48    pub fn new(
49        pool: Arc<WasmInstancePool<ProcedurePluginInstance>>,
50        qname: QName,
51        sig: ProcedureSignature,
52    ) -> Self {
53        let args_schema = build_args_schema(&sig);
54        let yields_schema = Arc::new(Schema::new(sig.yields.clone()));
55        Self {
56            pool,
57            qname,
58            sig,
59            args_schema,
60            yields_schema,
61        }
62    }
63}
64
65impl ProcedurePlugin for ComponentProcedure {
66    fn signature(&self) -> &ProcedureSignature {
67        &self.sig
68    }
69
70    fn invoke(
71        &self,
72        _ctx: ProcedureContext<'_>,
73        args: &[ColumnarValue],
74    ) -> Result<SendableRecordBatchStream, FnError> {
75        let arrays: Vec<arrow::array::ArrayRef> = args
76            .iter()
77            .map(|c| {
78                c.clone().into_array(1).map_err(|e| {
79                    FnError::new(
80                        FnError::CODE_TYPE_COERCION,
81                        format!("ColumnarValue::into_array: {e}"),
82                    )
83                })
84            })
85            .collect::<Result<_, _>>()?;
86        if arrays.len() != self.args_schema.fields().len() {
87            return Err(FnError::new(
88                FnError::CODE_TYPE_COERCION,
89                format!(
90                    "procedure `{}` expected {} args; got {}",
91                    self.qname,
92                    self.args_schema.fields().len(),
93                    arrays.len()
94                ),
95            ));
96        }
97        let batch = RecordBatch::try_new(Arc::clone(&self.args_schema), arrays).map_err(|e| {
98            FnError::new(
99                FnError::CODE_TYPE_COERCION,
100                format!("procedure `{}` args RecordBatch: {e}", self.qname),
101            )
102        })?;
103        let ipc = encode_batch(&batch).map_err(ipc_to_fn_err)?;
104
105        let qname_str = self.qname.to_string();
106        let mut leased = acquire(&self.pool, "procedure")?;
107        let out_bytes = leased
108            .get_mut()
109            .invoke_procedure(&qname_str, &ipc)
110            .map_err(|e| {
111                FnError::new(
112                    FnError::CODE_UNEXPECTED_NULL,
113                    format!("wasm invoke_procedure `{qname_str}`: {e}"),
114                )
115            })?;
116        drop(leased);
117
118        let batches = decode_batches(&out_bytes).map_err(ipc_to_fn_err)?;
119        for (i, b) in batches.iter().enumerate() {
120            if b.schema().fields() != self.yields_schema.fields() {
121                return Err(FnError::new(
122                    FnError::CODE_TYPE_COERCION,
123                    format!("procedure `{qname_str}` batch[{i}] schema mismatch"),
124                ));
125            }
126        }
127        let schema = Arc::clone(&self.yields_schema);
128        let stream = stream::iter(batches.into_iter().map(Ok));
129        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
130    }
131}
132
133fn build_args_schema(sig: &ProcedureSignature) -> SchemaRef {
134    let fields: Vec<Field> = sig
135        .args
136        .iter()
137        .enumerate()
138        .map(|(i, a)| Field::new(format!("arg{i}"), argtype_to_arrow(&a.ty), true))
139        .collect();
140    Arc::new(Schema::new(fields))
141}