pub struct Runtime { /* private fields */ }
Expand description
A runtime to execute user defined functions in Python.
§Usages
- Create a new runtime with
Runtime::new
orRuntime::builder
. - For scalar functions, use
add_function
andcall
. - For table functions, use
add_function
andcall_table_function
. - For aggregate functions, create the function with
add_aggregate
, and then- create a new state with
create_state
, - update the state with
accumulate
oraccumulate_or_retract
, - merge states with
merge
, - finally get the result with
finish
.
- create a new state with
Click on each function to see the example.
§Parallelism
As we know, Python has a Global Interpreter Lock (GIL) that prevents multiple threads from executing Python code simultaneously. To work around this limitation, each runtime creates a sub-interpreter with its own GIL. This feature requires Python 3.12 or later.
Implementations§
Source§impl Runtime
impl Runtime
Sourcepub fn add_function(
&mut self,
name: &str,
return_type: impl IntoField,
mode: CallMode,
code: &str,
) -> Result<()>
pub fn add_function( &mut self, name: &str, return_type: impl IntoField, mode: CallMode, code: &str, ) -> Result<()>
Add a new scalar function or table function.
§Arguments
name
: The name of the function.return_type
: The data type of the return value.mode
: Whether the function will be called when some of its arguments are null.code
: The Python code of the function.
The code should define a function with the same name as the function. The function should return a value for scalar functions, or yield values for table functions.
§Example
let mut runtime = Runtime::new().unwrap();
// add a scalar function
runtime
.add_function(
"gcd",
DataType::Int32,
CallMode::ReturnNullOnNullInput,
r#"
def gcd(a: int, b: int) -> int:
while b:
a, b = b, a % b
return a
"#,
)
.unwrap();
// add a table function
runtime
.add_function(
"series",
DataType::Int32,
CallMode::ReturnNullOnNullInput,
r#"
def series(n: int):
for i in range(n):
yield i
"#,
)
.unwrap();
Sourcepub fn add_function_with_handler(
&mut self,
name: &str,
return_type: impl IntoField,
mode: CallMode,
code: &str,
handler: &str,
) -> Result<()>
pub fn add_function_with_handler( &mut self, name: &str, return_type: impl IntoField, mode: CallMode, code: &str, handler: &str, ) -> Result<()>
Add a new scalar function or table function with custom handler name.
§Arguments
handler
: The name of function in Python code to be called.- others: Same as
add_function
.
Sourcepub fn add_aggregate(
&mut self,
name: &str,
state_type: impl IntoField,
output_type: impl IntoField,
mode: CallMode,
code: &str,
) -> Result<()>
pub fn add_aggregate( &mut self, name: &str, state_type: impl IntoField, output_type: impl IntoField, mode: CallMode, code: &str, ) -> Result<()>
Add a new aggregate function from Python code.
§Arguments
name
: The name of the function.state_type
: The data type of the internal state.output_type
: The data type of the aggregate value.mode
: Whether the function will be called when some of its arguments are null.code
: The Python code of the aggregate function.
The code should define at least two functions:
create_state() -> state
: Create a new state object.accumulate(state, *args) -> state
: Accumulate a new value into the state, returning the updated state.
optionally, the code can define:
finish(state) -> value
: Get the result of the aggregate function. If not defined, the state is returned as the result. In this case,output_type
must be the same asstate_type
.retract(state, *args) -> state
: Retract a value from the state, returning the updated state.merge(state, state) -> state
: Merge two states, returning the merged state.
§Example
let mut runtime = Runtime::new().unwrap();
runtime
.add_aggregate(
"sum",
DataType::Int32, // state_type
DataType::Int32, // output_type
CallMode::ReturnNullOnNullInput,
r#"
def create_state():
return 0
def accumulate(state, value):
return state + value
def retract(state, value):
return state - value
def merge(state1, state2):
return state1 + state2
"#,
)
.unwrap();
Sourcepub fn del_function(&mut self, name: &str) -> Result<()>
pub fn del_function(&mut self, name: &str) -> Result<()>
Remove a scalar or table function.
Sourcepub fn del_aggregate(&mut self, name: &str) -> Result<()>
pub fn del_aggregate(&mut self, name: &str) -> Result<()>
Remove an aggregate function.
Sourcepub fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
pub fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch>
Call a scalar function.
§Example
// suppose we have created a scalar function `gcd`
// see the example in `add_function`
let schema = Schema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let arg0 = Int32Array::from(vec![Some(25), None]);
let arg1 = Int32Array::from(vec![Some(15), None]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0), Arc::new(arg1)]).unwrap();
let output = runtime.call("gcd", &input).unwrap();
assert_eq!(&**output.column(0), &Int32Array::from(vec![Some(5), None]));
Sourcepub fn call_table_function<'a>(
&'a self,
name: &'a str,
input: &'a RecordBatch,
chunk_size: usize,
) -> Result<RecordBatchIter<'a>>
pub fn call_table_function<'a>( &'a self, name: &'a str, input: &'a RecordBatch, chunk_size: usize, ) -> Result<RecordBatchIter<'a>>
Call a table function.
§Example
// suppose we have created a table function `series`
// see the example in `add_function`
let schema = Schema::new(vec![Field::new("x", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let mut outputs = runtime.call_table_function("series", &input, 10).unwrap();
let output = outputs.next().unwrap().unwrap();
let pretty = arrow_cast::pretty::pretty_format_batches(&[output]).unwrap().to_string();
assert_eq!(pretty, r#"
+-----+--------+
| row | series |
+-----+--------+
| 0 | 0 |
| 2 | 0 |
| 2 | 1 |
| 2 | 2 |
+-----+--------+"#.trim());
Sourcepub fn create_state(&self, name: &str) -> Result<ArrayRef>
pub fn create_state(&self, name: &str) -> Result<ArrayRef>
Create a new state for an aggregate function.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
assert_eq!(&*state, &Int32Array::from(vec![0]));
Sourcepub fn accumulate(
&self,
name: &str,
state: &dyn Array,
input: &RecordBatch,
) -> Result<ArrayRef>
pub fn accumulate( &self, name: &str, state: &dyn Array, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate of an aggregate function.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate("sum", &state, &input).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));
Sourcepub fn accumulate_or_retract(
&self,
name: &str,
state: &dyn Array,
ops: &BooleanArray,
input: &RecordBatch,
) -> Result<ArrayRef>
pub fn accumulate_or_retract( &self, name: &str, state: &dyn Array, ops: &BooleanArray, input: &RecordBatch, ) -> Result<ArrayRef>
Call accumulate or retract of an aggregate function.
The ops
is a boolean array that indicates whether to accumulate or retract each row.
false
for accumulate and true
for retract.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let state = runtime.create_state("sum").unwrap();
let schema = Schema::new(vec![Field::new("value", DataType::Int32, true)]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let ops = BooleanArray::from(vec![false, false, true, false]);
let input = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0)]).unwrap();
let state = runtime.accumulate_or_retract("sum", &state, &ops, &input).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![3]));
Sourcepub fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
pub fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef>
Merge states of an aggregate function.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let states = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let state = runtime.merge("sum", &states).unwrap();
assert_eq!(&*state, &Int32Array::from(vec![9]));
Sourcepub fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
pub fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef>
Get the result of an aggregate function.
If the finish
function is not defined, the state is returned as the result.
§Example
// suppose we have created an aggregate function `sum`
// see the example in `add_aggregate`
let states: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(5)]));
let outputs = runtime.finish("sum", &states).unwrap();
assert_eq!(&outputs, &states);
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Runtime
impl !RefUnwindSafe for Runtime
impl Send for Runtime
impl Sync for Runtime
impl Unpin for Runtime
impl UnwindSafe for Runtime
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> GetSetFdFlags for T
impl<T> GetSetFdFlags for T
Source§fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
self
file descriptor.Source§fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
Source§fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
self
file descriptor. Read moreSource§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request