Struct arrow_udf_js::Runtime
source · pub struct Runtime { /* private fields */ }Expand description
A runtime to execute user defined functions in JavaScript.
§Usages
- Create a new runtime with
Runtime::new. - For scalar functions, use
add_functionandcall. - For table functions, use
add_functionandcall_table_function. - For aggregate functions, create the function with
add_aggregate, and then- create a new state with
create_state, - update the state with
accumulateoraccumulate_or_retract, - merge states with
merge, - finally get the result with
finish.
- create a new state with
Click on each function to see the example.
Implementations§
source§impl Runtime
impl Runtime
sourcepub fn set_memory_limit(&self, limit: Option<usize>)
pub fn set_memory_limit(&self, limit: Option<usize>)
Set the memory limit of the runtime.
§Example
let runtime = Runtime::new().unwrap();
runtime.set_memory_limit(Some(1 << 20)); // 1MBsourcepub fn set_timeout(&mut self, timeout: Option<Duration>)
pub fn set_timeout(&mut self, timeout: Option<Duration>)
Set the timeout of each function call.
§Example
let mut runtime = Runtime::new().unwrap();
runtime.set_timeout(Some(Duration::from_secs(1)));sourcepub fn memory_usage(&self) -> MemoryUsage
pub fn memory_usage(&self) -> MemoryUsage
Get memory usage of the internal quickjs runtime.
§Example
let runtime = Runtime::new().unwrap();
let usage = runtime.memory_usage();sourcepub fn converter_mut(&mut self) -> &mut Converter
pub fn converter_mut(&mut self) -> &mut Converter
Return the converter where you can configure the extension metadata key and values.
sourcepub fn add_function(
&mut self,
name: &str,
return_type: impl IntoField,
mode: CallMode,
code: &str,
) -> Result<()>
pub fn add_function( &mut self, name: &str, return_type: impl IntoField, mode: CallMode, code: &str, ) -> Result<()>
Add a new scalar function or table function.
§Arguments
name: The name of the function.return_type: The data type of the return value.mode: Whether the function will be called when some of its arguments are null.code: The JavaScript code of the function.
The code should define an exported function with the same name as the function. The function should return a value for scalar functions, or yield values for table functions.
§Example
let mut runtime = Runtime::new().unwrap();
// add a scalar function
runtime
.add_function(
"gcd",
DataType::Int32,
CallMode::ReturnNullOnNullInput,
r#"
export function gcd(a, b) {
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
}
"#,
)
.unwrap();
// add a table function
runtime
.add_function(
"series",
DataType::Int32,
CallMode::ReturnNullOnNullInput,
r#"
export function* series(n) {
for (let i = 0; i < n; i++) {
yield i;
}
}
"#,
)
.unwrap();sourcepub fn add_function_with_handler(
&mut self,
name: &str,
return_type: impl IntoField,
mode: CallMode,
code: &str,
handler: &str,
) -> Result<()>
pub fn add_function_with_handler( &mut self, name: &str, return_type: impl IntoField, mode: CallMode, code: &str, handler: &str, ) -> Result<()>
Add a new scalar function or table function with custom handler name.
§Arguments
handler: The name of function in Python code to be called.- others: Same as
add_function.
sourcepub fn add_aggregate(
&mut self,
name: &str,
state_type: impl IntoField,
output_type: impl IntoField,
mode: CallMode,
code: &str,
) -> Result<()>
pub fn add_aggregate( &mut self, name: &str, state_type: impl IntoField, output_type: impl IntoField, mode: CallMode, code: &str, ) -> Result<()>
Add a new aggregate function.
§Arguments
name: The name of the function.state_type: The data type of the internal state.output_type: The data type of the aggregate value.mode: Whether the function will be called when some of its arguments are null.code: The JavaScript code of the aggregate function.
The code should define at least two functions:
create_state() -> state: Create a new state object.accumulate(state, *args) -> state: Accumulate a new value into the state, returning the updated state.
optionally, the code can define:
finish(state) -> value: Get the result of the aggregate function. If not defined, the state is returned as the result. In this case,output_typemust be the same asstate_type.retract(state, *args) -> state: Retract a value from the state, returning the updated state.merge(state, state) -> state: Merge two states, returning the merged state.
Each function must be exported.
§Example
let mut runtime = Runtime::new().unwrap();
runtime
.add_aggregate(
"sum",
DataType::Int32, // state_type
DataType::Int32, // output_type
CallMode::ReturnNullOnNullInput,
r#"
export function create_state() {
return 0;
}
export function accumulate(state, value) {
return state + value;
}
export function retract(state, value) {
return state - value;
}
export function merge(state1, state2) {
return state1 + state2;
}
"#,
)
.unwrap();sourcepub fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
pub fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
Call a scalar function.
§Example
// suppose we have created a scalar function `gcd`
// see the example in `add_function`
let schema = Schema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let arg0 = Int32Array::from(vec![Some(25), None]);
let arg1 = Int32Array::from(vec![Some(15), None]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0), Arc::new(arg1)]).unwrap();
let output = runtime.call("gcd", &input).unwrap();
assert_eq!(&**output.column(0), &Int32Array::from(vec![Some(5), None]));sourcepub fn call_table_function<'a>(
&'a self,
name: &'a str,
input: &'a RecordBatch,
chunk_size: usize,
) -> Result<RecordBatchIter<'a>>
pub fn call_table_function<'a>( &'a self, name: &'a str, input: &'a RecordBatch, chunk_size: usize, ) -> Result<RecordBatchIter<'a>>
Call a table function.
§Example
// suppose we have created a table function `series`
// see the example in `add_function`
let schema = Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let mut outputs = runtime.call_table_function("series", &input, 10).unwrap();
let output = outputs.next().unwrap().unwrap();
let pretty = arrow_cast::pretty::pretty_format_batches(&[output]).unwrap().to_string();
assert_eq!(pretty, r#"
+-----+--------+
| row | series |
+-----+--------+
| 0 | 0 |
| 2 | 0 |
| 2 | 1 |
| 2 | 2 |
+-----+--------+"#.trim());sourcepub fn create_state(&self, name: &str) -> Result<ArrayRef>
pub fn create_state(&self, name: &str) -> Result<ArrayRef>
Create a new state for an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
assert_eq!(&*state, &Int32Array::from(vec![0]));sourcepub fn accumulate(
&self,
name: &str,
state: &dyn Array,
input: &RecordBatch,
) -> Result<ArrayRef>
pub fn accumulate( &self, name: &str, state: &dyn Array, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate of an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate("sum", &state, &input).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));sourcepub fn accumulate_or_retract(
&self,
name: &str,
state: &dyn Array,
ops: &BooleanArray,
input: &RecordBatch,
) -> Result<ArrayRef>
pub fn accumulate_or_retract( &self, name: &str, state: &dyn Array, ops: &BooleanArray, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate or retract of an aggregate function.
The ops is a boolean array that indicates whether to accumulate or retract each row.
false for accumulate and true for retract.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let ops = BooleanArray::from(vec![false, false, true, false]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate_or_retract("sum", &state, &ops, &input).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![3]));sourcepub fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
pub fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
Merge states of an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let states = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let state = runtime.merge("sum", &states).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));sourcepub fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
pub fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
Get the result of an aggregate function.
If the finish function is not defined, the state is returned as the result.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let states: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
let outputs = runtime.finish("sum", &states).unwrap();
assert_eq!(&outputs, &states);