pub struct Runtime { /* private fields */ }Expand description
A runtime to execute user defined functions in JavaScript.
§Usages
- Create a new runtime with
Runtime::new. - For scalar functions, use
add_functionandcall. - For table functions, use
add_functionandcall_table_function. - For aggregate functions, create the function with
add_aggregate, and then- create a new state with
create_state, - update the state with
accumulateoraccumulate_or_retract, - merge states with
merge, - finally get the result with
finish.
- create a new state with
Click on each function to see the example.
Implementations§
Source§impl Runtime
impl Runtime
Sourcepub async fn new() -> Result<Self>
pub async fn new() -> Result<Self>
Create a new Runtime.
§Example
let runtime = Runtime::new().await.unwrap();
runtime.set_memory_limit(Some(1 << 20)); // 1MBSourcepub async fn set_memory_limit(&self, limit: Option<usize>)
pub async fn set_memory_limit(&self, limit: Option<usize>)
Set the memory limit of the runtime.
§Example
let runtime = Runtime::new().await.unwrap();
runtime.set_memory_limit(Some(1 << 20)); // 1MBSourcepub async fn set_timeout(&mut self, timeout: Option<Duration>)
pub async fn set_timeout(&mut self, timeout: Option<Duration>)
Set the timeout of each function call.
§Example
let mut runtime = Runtime::new().await.unwrap();
runtime.set_timeout(Some(Duration::from_secs(1))).await;Sourcepub fn inner(&self) -> &AsyncRuntime
pub fn inner(&self) -> &AsyncRuntime
Return the inner quickjs runtime.
Sourcepub fn converter_mut(&mut self) -> &mut Converter
pub fn converter_mut(&mut self) -> &mut Converter
Return the converter where you can configure the extension metadata key and values.
Sourcepub async fn add_function(
&mut self,
name: &str,
return_type: impl IntoField + Send,
code: &str,
options: FunctionOptions,
) -> Result<()>
pub async fn add_function( &mut self, name: &str, return_type: impl IntoField + Send, code: &str, options: FunctionOptions, ) -> Result<()>
Add a new scalar function or table function.
§Arguments
name: The name of the function.return_type: The data type of the return value.options: The options for configuring the function.code: The JavaScript code of the function.
The code should define an exported function with the same name as the function. The function should return a value for scalar functions, or yield values for table functions.
§Example
let mut runtime = Runtime::new().await.unwrap();
// add a scalar function
runtime
.add_function(
"gcd",
DataType::Int32,
r#"
export function gcd(a, b) {
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
}
"#,
FunctionOptions::default().return_null_on_null_input(),
)
.await
.unwrap();
// add a table function
runtime
.add_function(
"series",
DataType::Int32,
r#"
export function* series(n) {
for (let i = 0; i < n; i++) {
yield i;
}
}
"#,
FunctionOptions::default().return_null_on_null_input(),
)
.await
.unwrap();Sourcepub async fn add_aggregate(
&mut self,
name: &str,
state_type: impl IntoField + Send,
output_type: impl IntoField + Send,
code: &str,
options: AggregateOptions,
) -> Result<()>
pub async fn add_aggregate( &mut self, name: &str, state_type: impl IntoField + Send, output_type: impl IntoField + Send, code: &str, options: AggregateOptions, ) -> Result<()>
Add a new aggregate function.
§Arguments
name: The name of the function.state_type: The data type of the internal state.output_type: The data type of the aggregate value.mode: Whether the function will be called when some of its arguments are null.code: The JavaScript code of the aggregate function.
The code should define at least two functions:
create_state() -> state: Create a new state object.accumulate(state, *args) -> state: Accumulate a new value into the state, returning the updated state.
optionally, the code can define:
finish(state) -> value: Get the result of the aggregate function. If not defined, the state is returned as the result. In this case,output_typemust be the same asstate_type.retract(state, *args) -> state: Retract a value from the state, returning the updated state.merge(state, state) -> state: Merge two states, returning the merged state.
Each function must be exported.
§Example
let mut runtime = Runtime::new().await.unwrap();
runtime
.add_aggregate(
"sum",
DataType::Int32, // state_type
DataType::Int32, // output_type
r#"
export function create_state() {
return 0;
}
export function accumulate(state, value) {
return state + value;
}
export function retract(state, value) {
return state - value;
}
export function merge(state1, state2) {
return state1 + state2;
}
"#,
AggregateOptions::default().return_null_on_null_input(),
)
.await
.unwrap();Sourcepub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
pub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
Call a scalar function.
§Example
// suppose we have created a scalar function `gcd`
// see the example in `add_function`
let schema = Schema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let arg0 = Int32Array::from(vec![Some(25), None]);
let arg1 = Int32Array::from(vec![Some(15), None]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0), Arc::new(arg1)]).unwrap();
let output = runtime.call("gcd", &input).await.unwrap();
assert_eq!(&**output.column(0), &Int32Array::from(vec![Some(5), None]));Sourcepub fn call_table_function<'a>(
&'a self,
name: &'a str,
input: &'a RecordBatch,
chunk_size: usize,
) -> Result<RecordBatchIter<'a>>
pub fn call_table_function<'a>( &'a self, name: &'a str, input: &'a RecordBatch, chunk_size: usize, ) -> Result<RecordBatchIter<'a>>
Call a table function.
§Example
// suppose we have created a table function `series`
// see the example in `add_function`
let schema = Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let mut outputs = runtime.call_table_function("series", &input, 10).unwrap();
let output = outputs.next().await.unwrap().unwrap();
let pretty = arrow_cast::pretty::pretty_format_batches(&[output]).unwrap().to_string();
assert_eq!(pretty, r#"
+-----+--------+
| row | series |
+-----+--------+
| 0 | 0 |
| 2 | 0 |
| 2 | 1 |
| 2 | 2 |
+-----+--------+"#.trim());Sourcepub async fn create_state(&self, name: &str) -> Result<ArrayRef>
pub async fn create_state(&self, name: &str) -> Result<ArrayRef>
Create a new state for an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![0]));Sourcepub async fn accumulate(
&self,
name: &str,
state: &dyn Array,
input: &RecordBatch,
) -> Result<ArrayRef>
pub async fn accumulate( &self, name: &str, state: &dyn Array, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate of an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").await.unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate("sum", &state, &input).await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));Sourcepub async fn accumulate_or_retract(
&self,
name: &str,
state: &dyn Array,
ops: &BooleanArray,
input: &RecordBatch,
) -> Result<ArrayRef>
pub async fn accumulate_or_retract( &self, name: &str, state: &dyn Array, ops: &BooleanArray, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate or retract of an aggregate function.
The ops is a boolean array that indicates whether to accumulate or retract each row.
false for accumulate and true for retract.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").await.unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let ops = BooleanArray::from(vec![false, false, true, false]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate_or_retract("sum", &state, &ops, &input).await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![3]));Sourcepub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
pub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
Merge states of an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let states = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let state = runtime.merge("sum", &states).await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));Sourcepub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
pub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
Get the result of an aggregate function.
If the finish function is not defined, the state is returned as the result.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let states: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
let outputs = runtime.finish("sum", &states).await.unwrap();
assert_eq!(&outputs, &states);pub fn context(&self) -> &AsyncContext
Sourcepub async fn enable_fetch(&self) -> Result<()>
pub async fn enable_fetch(&self) -> Result<()>
Enable the fetch API in the Runtime.
See module [fetch] for more details.