pub struct Runtime { /* private fields */ }
Expand description
A runtime to execute user defined functions in Python.
§Usages
- Create a new runtime with
Runtime::new
orRuntime::builder
. - For scalar functions, use
add_function
andcall
. - For table functions, use
add_function
andcall_table_function
. - For aggregate functions, create the function with
add_aggregate
, and then- create a new state with
create_state
, - update the state with
accumulate
oraccumulate_or_retract
, - merge states with
merge
, - finally get the result with
finish
.
- create a new state with
Click on each function to see the example.
§Parallelism
As we know, Python has a Global Interpreter Lock (GIL) that prevents multiple threads from executing Python code simultaneously. To work around this limitation, each runtime creates a sub-interpreter with its own GIL. This feature requires Python 3.12 or later.
Implementations§
Source§impl Runtime
impl Runtime
Sourcepub fn add_function(
&mut self,
name: &str,
return_type: impl IntoField,
mode: CallMode,
code: &str,
) -> Result<()>
pub fn add_function( &mut self, name: &str, return_type: impl IntoField, mode: CallMode, code: &str, ) -> Result<()>
Add a new scalar function or table function.
§Arguments
name
: The name of the function.return_type
: The data type of the return value.mode
: Whether the function will be called when some of its arguments are null.code
: The Python code of the function.
The code should define a function with the same name as the function. The function should return a value for scalar functions, or yield values for table functions.
§Example
let mut runtime = Runtime::new().unwrap();
// add a scalar function
runtime
.add_function(
"gcd",
DataType::Int32,
CallMode::ReturnNullOnNullInput,
r#"
def gcd(a: int, b: int) -> int:
while b:
a, b = b, a % b
return a
"#,
)
.unwrap();
// add a table function
runtime
.add_function(
"series",
DataType::Int32,
CallMode::ReturnNullOnNullInput,
r#"
def series(n: int):
for i in range(n):
yield i
"#,
)
.unwrap();
Sourcepub fn add_function_with_handler(
&mut self,
name: &str,
return_type: impl IntoField,
mode: CallMode,
code: &str,
handler: &str,
) -> Result<()>
pub fn add_function_with_handler( &mut self, name: &str, return_type: impl IntoField, mode: CallMode, code: &str, handler: &str, ) -> Result<()>
Add a new scalar function or table function with custom handler name.
§Arguments
handler
: The name of function in Python code to be called.- others: Same as
add_function
.
Sourcepub fn add_aggregate(
&mut self,
name: &str,
state_type: impl IntoField,
output_type: impl IntoField,
mode: CallMode,
code: &str,
) -> Result<()>
pub fn add_aggregate( &mut self, name: &str, state_type: impl IntoField, output_type: impl IntoField, mode: CallMode, code: &str, ) -> Result<()>
Add a new aggregate function from Python code.
§Arguments
name
: The name of the function.state_type
: The data type of the internal state.output_type
: The data type of the aggregate value.mode
: Whether the function will be called when some of its arguments are null.code
: The Python code of the aggregate function.
The code should define at least two functions:
create_state() -> state
: Create a new state object.accumulate(state, *args) -> state
: Accumulate a new value into the state, returning the updated state.
optionally, the code can define:
finish(state) -> value
: Get the result of the aggregate function. If not defined, the state is returned as the result. In this case,output_type
must be the same asstate_type
.retract(state, *args) -> state
: Retract a value from the state, returning the updated state.merge(state, state) -> state
: Merge two states, returning the merged state.
§Example
let mut runtime = Runtime::new().unwrap();
runtime
.add_aggregate(
"sum",
DataType::Int32, // state_type
DataType::Int32, // output_type
CallMode::ReturnNullOnNullInput,
r#"
def create_state():
return 0
def accumulate(state, value):
return state + value
def retract(state, value):
return state - value
def merge(state1, state2):
return state1 + state2
"#,
)
.unwrap();
Sourcepub fn del_function(&mut self, name: &str) -> Result<()>
pub fn del_function(&mut self, name: &str) -> Result<()>
Remove a scalar or table function.
Sourcepub fn del_aggregate(&mut self, name: &str) -> Result<()>
pub fn del_aggregate(&mut self, name: &str) -> Result<()>
Remove an aggregate function.
Sourcepub fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
pub fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
Call a scalar function.
§Example
// suppose we have created a scalar function `gcd`
// see the example in `add_function`
let schema = Schema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let arg0 = Int32Array::from(vec![Some(25), None]);
let arg1 = Int32Array::from(vec![Some(15), None]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0), Arc::new(arg1)]).unwrap();
let output = runtime.call("gcd", &input).unwrap();
assert_eq!(&**output.column(0), &Int32Array::from(vec![Some(5), None]));
Sourcepub fn call_table_function<'a>(
&'a self,
name: &'a str,
input: &'a RecordBatch,
chunk_size: usize,
) -> Result<RecordBatchIter<'a>>
pub fn call_table_function<'a>( &'a self, name: &'a str, input: &'a RecordBatch, chunk_size: usize, ) -> Result<RecordBatchIter<'a>>
Call a table function.
§Example
// suppose we have created a table function `series`
// see the example in `add_function`
let schema = Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let mut outputs = runtime.call_table_function("series", &input, 10).unwrap();
let output = outputs.next().unwrap().unwrap();
let pretty = arrow_cast::pretty::pretty_format_batches(&[output]).unwrap().to_string();
assert_eq!(pretty, r#"
+-----+--------+
| row | series |
+-----+--------+
| 0 | 0 |
| 2 | 0 |
| 2 | 1 |
| 2 | 2 |
+-----+--------+"#.trim());
Sourcepub fn create_state(&self, name: &str) -> Result<ArrayRef>
pub fn create_state(&self, name: &str) -> Result<ArrayRef>
Create a new state for an aggregate function.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
assert_eq!(&*state, &Int32Array::from(vec![0]));
Sourcepub fn accumulate(
&self,
name: &str,
state: &dyn Array,
input: &RecordBatch,
) -> Result<ArrayRef>
pub fn accumulate( &self, name: &str, state: &dyn Array, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate of an aggregate function.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate("sum", &state, &input).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));
Sourcepub fn accumulate_or_retract(
&self,
name: &str,
state: &dyn Array,
ops: &BooleanArray,
input: &RecordBatch,
) -> Result<ArrayRef>
pub fn accumulate_or_retract( &self, name: &str, state: &dyn Array, ops: &BooleanArray, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate or retract of an aggregate function.
The ops
is a boolean array that indicates whether to accumulate or retract each row.
false
for accumulate and true
for retract.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let ops = BooleanArray::from(vec![false, false, true, false]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate_or_retract("sum", &state, &ops, &input).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![3]));
Sourcepub fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
pub fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
Merge states of an aggregate function.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let states = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let state = runtime.merge("sum", &states).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));
Sourcepub fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
pub fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
Get the result of an aggregate function.
If the finish
function is not defined, the state is returned as the result.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let states: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
let outputs = runtime.finish("sum", &states).unwrap();
assert_eq!(&outputs, &states);