use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::SchemaRef;
use vgi_rpc::{Result, RpcError};
use crate::arguments::Arguments;
use crate::function::{ArgSpec, BindResponse, FunctionMetadata};
use crate::settings::Settings;
pub const GROUP_COLUMN_NAME: &str = "__vgi_group_id";
pub struct AggregateBindParams {
pub arguments: Arguments,
pub input_schema: Option<SchemaRef>,
pub settings: Settings,
}
pub trait AggregateFunction: Send + Sync {
fn name(&self) -> &str;
fn metadata(&self) -> FunctionMetadata;
fn argument_specs(&self) -> Vec<ArgSpec>;
fn on_bind(&self, params: &AggregateBindParams) -> Result<BindResponse>;
fn initial_state(&self) -> Vec<u8>;
fn update(
&self,
states: &mut HashMap<i64, Vec<u8>>,
group_ids: &Int64Array,
columns: &[ArrayRef],
) -> Result<()>;
fn combine(&self, target: Vec<u8>, source: Vec<u8>) -> Result<Vec<u8>>;
fn finalize(
&self,
output_schema: &SchemaRef,
group_ids: &Int64Array,
states: &[Option<Vec<u8>>],
) -> Result<RecordBatch>;
fn window(
&self,
_partition: &RecordBatch,
_output_schema: &SchemaRef,
_frames: &[Vec<(i64, i64)>],
_filter_mask: Option<&[bool]>,
) -> Result<arrow_array::ArrayRef> {
Err(RpcError::runtime_error(
"window() not supported by this aggregate",
))
}
fn streaming_chunk(
&self,
_chunk: &RecordBatch,
_partition_key_count: usize,
_order_key_count: usize,
_states: &mut HashMap<Vec<u8>, Vec<u8>>,
) -> Result<ArrayRef> {
Err(RpcError::runtime_error(
"streaming_chunk() not supported by this aggregate",
))
}
fn finalize_with_args(
&self,
output_schema: &SchemaRef,
group_ids: &Int64Array,
states: &[Option<Vec<u8>>],
_args: &crate::arguments::Arguments,
) -> Result<RecordBatch> {
self.finalize(output_schema, group_ids, states)
}
}