pub struct Runtime { /* private fields */ }Expand description
A runtime to execute user defined functions in JavaScript.
§Usages
- Create a new runtime with
Runtime::new. - For scalar functions, use
add_functionandcall. - For table functions, use
add_functionandcall_table_function. - For aggregate functions, create the function with
add_aggregate, and then- create a new state with
create_state, - update the state with
accumulateoraccumulate_or_retract, - merge states with
merge, - finally get the result with
finish.
- create a new state with
Click on each function to see the example.
Implementations§
Source§impl Runtime
impl Runtime
Sourcepub async fn new() -> Result<Self>
pub async fn new() -> Result<Self>
Create a new Runtime.
§Example
let runtime = Runtime::new().await.unwrap();
runtime.set_memory_limit(Some(1 << 20)); // 1MBSourcepub async fn set_memory_limit(&self, limit: Option<usize>)
pub async fn set_memory_limit(&self, limit: Option<usize>)
Set the memory limit of the runtime.
§Example
let runtime = Runtime::new().await.unwrap();
runtime.set_memory_limit(Some(1 << 20)); // 1MBSourcepub async fn set_timeout(&mut self, timeout: Option<Duration>)
pub async fn set_timeout(&mut self, timeout: Option<Duration>)
Set the timeout of each function call.
§Example
let mut runtime = Runtime::new().await.unwrap();
runtime.set_timeout(Some(Duration::from_secs(1))).await;Sourcepub fn inner(&self) -> &AsyncRuntime
pub fn inner(&self) -> &AsyncRuntime
Return the inner quickjs runtime.
Sourcepub fn converter_mut(&mut self) -> &mut Converter
pub fn converter_mut(&mut self) -> &mut Converter
Return the converter where you can configure the extension metadata key and values.
Sourcepub async fn add_function(
&mut self,
name: &str,
return_type: impl IntoField + Send,
code: &str,
options: FunctionOptions,
) -> Result<()>
pub async fn add_function( &mut self, name: &str, return_type: impl IntoField + Send, code: &str, options: FunctionOptions, ) -> Result<()>
Add a new scalar function or table function.
§Arguments
name: The name of the function.return_type: The data type of the return value.options: The options for configuring the function.code: The JavaScript code of the function.
The code should define an exported function with the same name as the function. The function should return a value for scalar functions, or yield values for table functions.
§Example
let mut runtime = Runtime::new().await.unwrap();
// add a scalar function
runtime
.add_function(
"gcd",
DataType::Int32,
r#"
export function gcd(a, b) {
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
}
"#,
FunctionOptions::default().return_null_on_null_input(),
)
.await
.unwrap();
// add a table function
runtime
.add_function(
"series",
DataType::Int32,
r#"
export function* series(n) {
for (let i = 0; i < n; i++) {
yield i;
}
}
"#,
FunctionOptions::default().return_null_on_null_input(),
)
.await
.unwrap();Sourcepub async fn add_aggregate(
&mut self,
name: &str,
state_type: impl IntoField + Send,
output_type: impl IntoField + Send,
code: &str,
options: AggregateOptions,
) -> Result<()>
pub async fn add_aggregate( &mut self, name: &str, state_type: impl IntoField + Send, output_type: impl IntoField + Send, code: &str, options: AggregateOptions, ) -> Result<()>
Add a new aggregate function.
§Arguments
name: The name of the function.state_type: The data type of the internal state.output_type: The data type of the aggregate value.mode: Whether the function will be called when some of its arguments are null.code: The JavaScript code of the aggregate function.
The code should define at least two functions:
create_state() -> state: Create a new state object.accumulate(state, *args) -> state: Accumulate a new value into the state, returning the updated state.
optionally, the code can define:
finish(state) -> value: Get the result of the aggregate function. If not defined, the state is returned as the result. In this case,output_typemust be the same asstate_type.retract(state, *args) -> state: Retract a value from the state, returning the updated state.merge(state, state) -> state: Merge two states, returning the merged state.
Each function must be exported.
§Example
let mut runtime = Runtime::new().await.unwrap();
runtime
.add_aggregate(
"sum",
DataType::Int32, // state_type
DataType::Int32, // output_type
r#"
export function create_state() {
return 0;
}
export function accumulate(state, value) {
return state + value;
}
export function retract(state, value) {
return state - value;
}
export function merge(state1, state2) {
return state1 + state2;
}
"#,
AggregateOptions::default().return_null_on_null_input(),
)
.await
.unwrap();Sourcepub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
pub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
Call a scalar function.
§Example
// suppose we have created a scalar function `gcd`
// see the example in `add_function`
let schema = Schema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let arg0 = Int32Array::from(vec![Some(25), None]);
let arg1 = Int32Array::from(vec![Some(15), None]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0), Arc::new(arg1)]).unwrap();
let output = runtime.call("gcd", &input).await.unwrap();
assert_eq!(&**output.column(0), &Int32Array::from(vec![Some(5), None]));Sourcepub fn call_table_function<'a>(
&'a self,
name: &'a str,
input: &'a RecordBatch,
chunk_size: usize,
) -> Result<RecordBatchIter<'a>>
pub fn call_table_function<'a>( &'a self, name: &'a str, input: &'a RecordBatch, chunk_size: usize, ) -> Result<RecordBatchIter<'a>>
Call a table function.
§Example
// suppose we have created a table function `series`
// see the example in `add_function`
let schema = Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let mut outputs = runtime.call_table_function("series", &input, 10).unwrap();
let output = outputs.next().await.unwrap().unwrap();
let pretty = arrow_cast::pretty::pretty_format_batches(&[output]).unwrap().to_string();
assert_eq!(pretty, r#"
+-----+--------+
| row | series |
+-----+--------+
| 0 | 0 |
| 2 | 0 |
| 2 | 1 |
| 2 | 2 |
+-----+--------+"#.trim());Sourcepub async fn create_state(&self, name: &str) -> Result<ArrayRef>
pub async fn create_state(&self, name: &str) -> Result<ArrayRef>
Create a new state for an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![0]));Sourcepub async fn accumulate(
&self,
name: &str,
state: &dyn Array,
input: &RecordBatch,
) -> Result<ArrayRef>
pub async fn accumulate( &self, name: &str, state: &dyn Array, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate of an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").await.unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate("sum", &state, &input).await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));Sourcepub async fn accumulate_or_retract(
&self,
name: &str,
state: &dyn Array,
ops: &BooleanArray,
input: &RecordBatch,
) -> Result<ArrayRef>
pub async fn accumulate_or_retract( &self, name: &str, state: &dyn Array, ops: &BooleanArray, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate or retract of an aggregate function.
The ops is a boolean array that indicates whether to accumulate or retract each row.
false for accumulate and true for retract.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let state = runtime.create_state("sum").await.unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let ops = BooleanArray::from(vec![false, false, true, false]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate_or_retract("sum", &state, &ops, &input).await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![3]));Sourcepub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
pub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
Merge states of an aggregate function.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let states = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let state = runtime.merge("sum", &states).await.unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));Sourcepub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
pub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
Get the result of an aggregate function.
If the finish function is not defined, the state is returned as the result.
§Example
// suppose we have created a sum aggregate function
// see the example in `add_aggregate`
let states: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
let outputs = runtime.finish("sum", &states).await.unwrap();
assert_eq!(&outputs, &states);pub fn context(&self) -> &AsyncContext
Sourcepub async fn enable_fetch(&self) -> Result<()>
pub async fn enable_fetch(&self) -> Result<()>
Enable the fetch API in the Runtime.
See module [fetch] for more details.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Runtime
impl !RefUnwindSafe for Runtime
impl Unpin for Runtime
impl UnsafeUnpin for Runtime
impl !UnwindSafe for Runtime
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> GetSetFdFlags for T
impl<T> GetSetFdFlags for T
Source§fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
self file descriptor.Source§fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
Source§fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
self file descriptor. Read moreSource§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request