use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, StructArray};
use arrow_schema::{Field, Schema, SchemaRef};
use vgi_rpc::{Result, RpcError};
use crate::ipc;
#[derive(Clone, Default)]
pub struct Arguments {
pub positional: Vec<Option<ArrayRef>>,
pub named: HashMap<String, ArrayRef>,
pub schema: Option<SchemaRef>,
pub positional_fields: Vec<Option<Field>>,
}
impl Arguments {
pub fn parse(bytes: &[u8]) -> Result<Arguments> {
if bytes.is_empty() {
return Ok(Arguments::default());
}
let batch = ipc::read_batch(bytes)?;
let mut args = Arguments::default();
if batch.num_columns() == 1 && batch.schema().field(0).name() == "args" {
let sa = batch
.column(0)
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| RpcError::type_error("'args' column is not a struct"))?;
let mut pos: Vec<(usize, ArrayRef)> = Vec::new();
let mut sfields: Vec<(usize, Field)> = Vec::new();
for (i, field) in sa.fields().iter().enumerate() {
let col = sa.column(i).clone();
let name = field.name();
if let Some(idx) = name.strip_prefix("positional_") {
if let Ok(idx) = idx.parse::<usize>() {
pos.push((idx, col.clone()));
sfields.push((idx, field.as_ref().clone()));
}
args.named.insert(name.clone(), col);
} else if let Some(actual) = name.strip_prefix("named_") {
args.named.insert(actual.to_string(), col.clone());
args.named.insert(name.clone(), col);
} else {
args.named.insert(name.clone(), col);
}
}
if !pos.is_empty() {
let max_idx = pos.iter().map(|(i, _)| *i).max().unwrap();
args.positional = vec![None; max_idx + 1];
args.positional_fields = vec![None; max_idx + 1];
for (i, a) in pos {
args.positional[i] = Some(a);
}
for (i, f) in &sfields {
args.positional_fields[*i] = Some(f.clone());
}
sfields.sort_by_key(|(i, _)| *i);
let fields: Vec<Field> = sfields.into_iter().map(|(_, f)| f).collect();
args.schema = Some(Arc::new(Schema::new(fields)));
}
return Ok(args);
}
for (i, field) in batch.schema().fields().iter().enumerate() {
let col = batch.column(i).clone();
args.named.insert(field.name().clone(), col.clone());
args.positional.push(Some(col));
}
args.schema = Some(batch.schema());
Ok(args)
}
pub fn serialize_positional(values: &[ArrayRef]) -> Result<Vec<u8>> {
use arrow_array::RecordBatch;
if values.is_empty() {
let schema = Arc::new(Schema::new(vec![Field::new(
"args",
arrow_schema::DataType::Struct(arrow_schema::Fields::empty()),
false,
)]));
return crate::ipc::write_schema(&schema);
}
let pairs: Vec<(Arc<Field>, ArrayRef)> = values
.iter()
.enumerate()
.map(|(i, a)| {
(
Arc::new(Field::new(
format!("positional_{i}"),
a.data_type().clone(),
true,
)),
a.clone(),
)
})
.collect();
let sa = StructArray::from(pairs);
let schema = Arc::new(Schema::new(vec![Field::new(
"args",
sa.data_type().clone(),
false,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(sa)])
.map_err(|e| RpcError::runtime_error(format!("serialize args: {e}")))?;
crate::ipc::write_batch(&batch)
}
pub fn serialize_scan_args(values: &[ArrayRef]) -> Result<Vec<u8>> {
Self::serialize_scan_args_named(values, &[])
}
pub fn serialize_scan_args_named(
positional: &[ArrayRef],
named: &[(&str, ArrayRef)],
) -> Result<Vec<u8>> {
use arrow_array::RecordBatch;
if positional.is_empty() && named.is_empty() {
return crate::ipc::write_schema(&Schema::empty());
}
let mut fields: Vec<Field> = positional
.iter()
.enumerate()
.map(|(i, a)| Field::new(format!("arg_{i}"), a.data_type().clone(), false))
.collect();
let mut cols: Vec<ArrayRef> = positional.to_vec();
for (name, a) in named {
fields.push(Field::new(*name, a.data_type().clone(), false));
cols.push(a.clone());
}
let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
.map_err(|e| RpcError::runtime_error(format!("serialize scan args: {e}")))?;
crate::ipc::write_batch(&batch)
}
pub fn arg(&self, pos: usize) -> Option<&ArrayRef> {
self.positional.get(pos).and_then(|o| o.as_ref())
}
pub fn arg_field(&self, pos: usize) -> Option<&Field> {
self.positional_fields.get(pos).and_then(|o| o.as_ref())
}
fn nonnull(&self, pos: usize) -> Option<&ArrayRef> {
let a = self.arg(pos)?;
if a.is_empty() || a.is_null(0) {
None
} else {
Some(a)
}
}
pub fn const_i64(&self, pos: usize) -> Option<i64> {
let a = self.nonnull(pos)?;
crate::numeric::array_value_i64(a, 0)
}
pub fn const_f64(&self, pos: usize) -> Option<f64> {
let a = self.nonnull(pos)?;
crate::numeric::array_value_f64(a, 0)
}
pub fn const_str(&self, pos: usize) -> Option<String> {
let a = self.nonnull(pos)?;
if let Some(s) = a.as_string_opt::<i32>() {
return Some(s.value(0).to_string());
}
if let Some(s) = a.as_string_opt::<i64>() {
return Some(s.value(0).to_string());
}
None
}
pub fn const_bool(&self, pos: usize) -> Option<bool> {
let a = self.nonnull(pos)?;
a.as_boolean_opt().map(|b| b.value(0))
}
pub fn const_bytes(&self, pos: usize) -> Option<Vec<u8>> {
let a = self.nonnull(pos)?;
if let Some(b) = a.as_binary_opt::<i32>() {
return Some(b.value(0).to_vec());
}
if let Some(b) = a.as_binary_opt::<i64>() {
return Some(b.value(0).to_vec());
}
None
}
pub fn num_positional(&self) -> usize {
self.positional.len()
}
fn named_nonnull(&self, name: &str) -> Option<&ArrayRef> {
let a = self.named.get(name)?;
if a.is_empty() || a.is_null(0) {
None
} else {
Some(a)
}
}
pub fn named(&self, name: &str) -> Option<&ArrayRef> {
self.named_nonnull(name)
}
pub fn named_i64(&self, name: &str) -> Option<i64> {
crate::numeric::array_value_i64(self.named_nonnull(name)?, 0)
}
pub fn named_f64(&self, name: &str) -> Option<f64> {
crate::numeric::array_value_f64(self.named_nonnull(name)?, 0)
}
pub fn named_str(&self, name: &str) -> Option<String> {
let a = self.named_nonnull(name)?;
if let Some(s) = a.as_string_opt::<i32>() {
return Some(s.value(0).to_string());
}
a.as_string_opt::<i64>().map(|s| s.value(0).to_string())
}
pub fn named_bool(&self, name: &str) -> Option<bool> {
self.named_nonnull(name)?
.as_boolean_opt()
.map(|b| b.value(0))
}
pub fn remap_positional(&mut self, specs: &[crate::function::ArgSpec]) {
if self.positional.is_empty() || specs.is_empty() {
return;
}
let mut const_positions = Vec::new();
let mut max_pos = 0i32;
for s in specs {
if s.position >= 0 && s.is_const {
const_positions.push(s.position as usize);
}
if s.position > max_pos {
max_pos = s.position;
}
}
if const_positions.len() == specs.len() {
return;
}
if self.positional.len() as i32 > max_pos {
return;
}
let mut expanded: Vec<Option<ArrayRef>> = vec![None; max_pos as usize + 1];
for (i, &orig) in const_positions.iter().enumerate() {
if let Some(a) = self.positional.get(i).and_then(|o| o.clone()) {
expanded[orig] = Some(a);
}
}
self.positional = expanded;
}
}