use std::sync::Arc;
use crate::buffer::BufferReader;
use crate::layout::{OffsetTable, SchemaLayout};
use crate::schema_canonical::{Field, Schema, TypeRepr};
use crate::smol_str::SmolStr;
use crate::value::Value;
use crate::verifier::{verify_record_multi, verify_value_at_multi, MultiRegion, VerifyError};
use crate::RuntimeError;
#[derive(Debug, Clone, Copy)]
pub struct ArenaRegions {
pub const_data_len: usize,
pub in_ptr: u32,
pub in_len: u32,
pub out_ptr: u32,
pub out_cap: u32,
pub scratch_base: u32,
pub arena_size: usize,
}
impl ArenaRegions {
pub fn multi_region(&self) -> Result<MultiRegion, VerifyError> {
let in_start = self.in_ptr as usize;
let in_end = in_start + self.in_len as usize;
let out_start = self.out_ptr as usize;
let out_end = out_start + self.out_cap as usize;
let scratch_start = self.scratch_base as usize;
MultiRegion::new(
(0, self.const_data_len),
(in_start, in_end),
(out_start, out_end),
(scratch_start, self.arena_size),
)
}
}
pub fn decode_inplace_sentinel(ret: i32) -> Result<usize, RuntimeError> {
let root = -(ret as i64) - 1;
if root < 0 {
return Err(RuntimeError::IoError(format!(
"in-place return sentinel {ret} decodes to a negative root offset"
)));
}
usize::try_from(root).map_err(|_| {
RuntimeError::IoError(format!(
"in-place return sentinel {ret} decodes to an out-of-range root offset"
))
})
}
pub fn decode_inplace_return(
backend: &str,
arena: &[u8],
regions: ArenaRegions,
root_abs: usize,
return_field: &Field,
return_layout: &OffsetTable,
return_fields: &[Field],
) -> Result<Value, RuntimeError> {
#[allow(clippy::enum_variant_names)]
enum InplaceShape<'a> {
ListListScalar(TypeRepr),
ListString,
ListSchema(&'a Schema),
ListListPointerArray(&'a TypeRepr),
ListVariant(&'a TypeRepr),
}
let shape = match &return_field.ty {
TypeRepr::List { element } => match element.as_ref() {
TypeRepr::List { element: inner } => match inner.as_ref() {
TypeRepr::Int | TypeRepr::Float | TypeRepr::Bool => {
InplaceShape::ListListScalar(inner.as_ref().clone())
}
_ => InplaceShape::ListListPointerArray(element.as_ref()),
},
TypeRepr::String => InplaceShape::ListString,
TypeRepr::Schema { schema } => InplaceShape::ListSchema(schema.as_ref()),
TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
InplaceShape::ListVariant(element.as_ref())
}
other => {
return Err(RuntimeError::IoError(format!(
"{backend} in-place return: unsupported list element {other:?} \
(expected List<scalar>, String, Schema, Option, Result, or Enum)"
)));
}
},
other => {
return Err(RuntimeError::IoError(format!(
"{backend} in-place return: expected a pointer-array List, got {other:?}"
)));
}
};
let arena_size = regions.arena_size;
if arena_size > arena.len() {
return Err(RuntimeError::IoError(format!(
"{backend} in-place return arena_size {arena_size} exceeds arena slice {}",
arena.len()
)));
}
let arena = &arena[..arena_size];
let multi = regions.multi_region().map_err(|e| {
RuntimeError::IoError(format!(
"{backend} in-place return arena regions invalid: {e}"
))
})?;
if root_abs >= arena_size {
return Err(RuntimeError::IoError(format!(
"{backend} in-place return root {root_abs} is past arena end {arena_size}"
)));
}
let list_element = return_layout.fields.first().and_then(|fo| fo.list_element);
verify_value_at_multi(arena, &return_field.ty, list_element, root_abs, multi).map_err(|e| {
RuntimeError::IoError(format!(
"{backend} in-place return verifier rejected the buffer (root_abs={root_abs}): {e}"
))
})?;
let reader = BufferReader::new(return_layout, return_fields, arena)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
match shape {
InplaceShape::ListListScalar(inner) => {
let rows = reader
.read_list_list_at(root_abs, &inner)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
Ok(Value::List(Arc::new(
rows.into_iter().map(|r| Value::List(Arc::new(r))).collect(),
)))
}
InplaceShape::ListString => {
let items = reader
.read_list_string_at(root_abs)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
Ok(Value::List(Arc::new(
items.into_iter().map(|s| Value::String(s.into())).collect(),
)))
}
InplaceShape::ListListPointerArray(outer_element) => {
let rows = reader
.read_list_value_at(root_abs, outer_element)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
Ok(Value::List(Arc::new(rows)))
}
InplaceShape::ListVariant(element) => {
let rows = reader
.read_list_value_at(root_abs, element)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
Ok(Value::List(Arc::new(rows)))
}
InplaceShape::ListSchema(schema) => {
let elem_layout = SchemaLayout::offsets_for(schema).map_err(|e| {
RuntimeError::IoError(format!(
"{backend} in-place List<Schema> element `{}` layout: {e}",
schema.name
))
})?;
let sub_readers = reader
.read_list_record_at(root_abs, &elem_layout, schema)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
let mut items = Vec::with_capacity(sub_readers.len());
for sub in &sub_readers {
if schema.is_tuple {
items.push(read_tuple_record_into_tuple(backend, sub, schema)?);
} else {
let map = read_record_into_branded_map(backend, sub, schema)?;
items.push(Value::branded_dict(map, Some(schema.name.clone())));
}
}
Ok(Value::List(Arc::new(items)))
}
}
}
pub fn verify_object_return_multi(
backend: &str,
arena: &[u8],
record_base: usize,
multi: MultiRegion,
return_layout: &OffsetTable,
return_fields: &[Field],
) -> Result<(), RuntimeError> {
verify_record_multi(arena, return_layout, return_fields, record_base, multi).map_err(|e| {
RuntimeError::IoError(format!(
"{backend} cross-region object return verifier rejected the arena (base={record_base}): {e}"
))
})
}
#[allow(clippy::too_many_arguments)]
pub fn decode_object_return(
backend: &str,
arena: &[u8],
record_base: usize,
regions: ArenaRegions,
return_layout: &OffsetTable,
return_schema: &Schema,
single_value_wrapper: bool,
) -> Result<Value, RuntimeError> {
let multi = regions.multi_region().map_err(|e| {
RuntimeError::IoError(format!(
"{backend} object return arena regions invalid: {e}"
))
})?;
verify_object_return_multi(
backend,
arena,
record_base,
multi,
return_layout,
&return_schema.fields,
)?;
let reader =
BufferReader::new_at_base(return_layout, &return_schema.fields, arena, record_base)
.map_err(|e| RuntimeError::IoError(format!("{backend} buffer: {e}")))?;
if single_value_wrapper {
let field = &return_schema.fields[0];
read_object_field(backend, &reader, field, return_schema)
} else if return_schema.is_tuple {
read_tuple_record_into_tuple(backend, &reader, return_schema)
} else {
let map = read_object_record_into_map(backend, &reader, return_schema)?;
Ok(Value::branded_dict(map, Some(return_schema.name.clone())))
}
}
fn read_tuple_record_into_tuple(
backend: &str,
reader: &BufferReader<'_>,
schema: &Schema,
) -> Result<Value, RuntimeError> {
let mut items = Vec::with_capacity(schema.fields.len());
for field in &schema.fields {
items.push(read_object_field(backend, reader, field, schema)?);
}
Ok(Value::Tuple(Arc::new(items)))
}
fn read_object_record_into_map(
backend: &str,
reader: &BufferReader<'_>,
schema: &Schema,
) -> Result<std::collections::BTreeMap<SmolStr, Value>, RuntimeError> {
let mut map = std::collections::BTreeMap::new();
for field in &schema.fields {
let value = read_object_field(backend, reader, field, schema)?;
map.insert(SmolStr::from(field.name.as_str()), value);
}
Ok(map)
}
fn read_object_field(
backend: &str,
reader: &BufferReader<'_>,
field: &Field,
parent_schema: &Schema,
) -> Result<Value, RuntimeError> {
let map_err =
|e: crate::buffer::BufferError| RuntimeError::IoError(format!("{backend} buffer: {e}"));
match &field.ty {
TypeRepr::Int => reader
.read_int(&field.name)
.map(Value::Int)
.map_err(map_err),
TypeRepr::Float => reader
.read_float(&field.name)
.map(|f| Value::Float(ordered_float::OrderedFloat(f)))
.map_err(map_err),
TypeRepr::Bool => reader
.read_bool(&field.name)
.map(Value::Bool)
.map_err(map_err),
TypeRepr::Unit => reader
.read_unit(&field.name)
.map(|()| Value::option_none())
.map_err(map_err),
TypeRepr::String => reader
.read_string(&field.name)
.map(|s| Value::String(s.into()))
.map_err(map_err),
TypeRepr::Schema { schema } => {
let sub_layout = SchemaLayout::offsets_for(schema).map_err(|e| {
RuntimeError::IoError(format!(
"{backend} object return field `{}` layout: {e}",
field.name
))
})?;
let sub_reader = reader
.sub_record(&field.name, &sub_layout, &schema.fields)
.map_err(map_err)?;
let map = read_object_record_into_map(backend, &sub_reader, schema)?;
Ok(Value::branded_dict(map, Some(schema.name.clone())))
}
TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
reader.read_value(&field.name, &field.ty).map_err(map_err)
}
TypeRepr::List { element } => match element.as_ref() {
TypeRepr::Int => reader
.read_list_int(&field.name)
.map(|v| Value::List(Arc::new(v.into_iter().map(Value::Int).collect())))
.map_err(map_err),
TypeRepr::Float => reader
.read_list_float(&field.name)
.map(|v| {
Value::List(Arc::new(
v.into_iter()
.map(|f| Value::Float(ordered_float::OrderedFloat(f)))
.collect(),
))
})
.map_err(map_err),
TypeRepr::Bool => reader
.read_list_bool(&field.name)
.map(|v| Value::List(Arc::new(v.into_iter().map(Value::Bool).collect())))
.map_err(map_err),
TypeRepr::String => reader
.read_list_string(&field.name)
.map(|v| {
Value::List(Arc::new(
v.into_iter().map(|s| Value::String(s.into())).collect(),
))
})
.map_err(map_err),
TypeRepr::Schema { schema } => {
let elem_layout = SchemaLayout::offsets_for(schema).map_err(|e| {
RuntimeError::IoError(format!(
"{backend} object return List<Schema> element `{}` layout: {e}",
schema.name
))
})?;
let sub_readers = reader
.read_list_record(&field.name, &elem_layout, schema)
.map_err(map_err)?;
let mut items = Vec::with_capacity(sub_readers.len());
for sub in &sub_readers {
if schema.is_tuple {
items.push(read_tuple_record_into_tuple(backend, sub, schema)?);
} else {
let map = read_object_record_into_map(backend, sub, schema)?;
items.push(Value::branded_dict(map, Some(schema.name.clone())));
}
}
Ok(Value::List(Arc::new(items)))
}
TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => reader
.read_list_value(&field.name, element.as_ref())
.map(|rows| Value::List(Arc::new(rows)))
.map_err(map_err),
TypeRepr::List { element: inner } => match inner.as_ref() {
TypeRepr::Int | TypeRepr::Float | TypeRepr::Bool => reader
.read_list_list(&field.name)
.map(|rows| {
Value::List(Arc::new(
rows.into_iter().map(|r| Value::List(Arc::new(r))).collect(),
))
})
.map_err(map_err),
_ => reader
.read_list_value(&field.name, element.as_ref())
.map(|rows| Value::List(Arc::new(rows)))
.map_err(map_err),
},
other => Err(RuntimeError::IoError(format!(
"{backend} object return: cannot decode list field `{field}` of element type \
`{ty:?}` in schema `{schema}`",
field = field.name,
ty = other,
schema = parent_schema.name,
))),
},
other => Err(RuntimeError::IoError(format!(
"{backend} object return: cannot decode field `{field}` of type `{ty:?}` in schema \
`{schema}`",
field = field.name,
ty = other,
schema = parent_schema.name,
))),
}
}
fn read_record_into_branded_map(
backend: &str,
reader: &BufferReader<'_>,
schema: &Schema,
) -> Result<std::collections::BTreeMap<SmolStr, Value>, RuntimeError> {
let mut map = std::collections::BTreeMap::new();
for field in &schema.fields {
let value = read_record_field(backend, reader, field)?;
map.insert(SmolStr::from(field.name.as_str()), value);
}
Ok(map)
}
fn read_record_field(
backend: &str,
reader: &BufferReader<'_>,
field: &Field,
) -> Result<Value, RuntimeError> {
let name = field.name.as_str();
let map_err = |e: crate::buffer::BufferError| {
RuntimeError::IoError(format!(
"{backend} in-place List<Schema> field `{name}`: {e}"
))
};
match &field.ty {
TypeRepr::Int => reader.read_int(name).map(Value::Int).map_err(map_err),
TypeRepr::Float => reader
.read_float(name)
.map(|f| Value::Float(ordered_float::OrderedFloat(f)))
.map_err(map_err),
TypeRepr::Bool => reader.read_bool(name).map(Value::Bool).map_err(map_err),
TypeRepr::Unit => reader
.read_unit(name)
.map(|()| Value::option_none())
.map_err(map_err),
TypeRepr::String => reader
.read_string(name)
.map(|s| Value::String(s.into()))
.map_err(map_err),
TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
reader.read_value(name, &field.ty).map_err(map_err)
}
TypeRepr::List { element } => reader
.read_list_value(name, element.as_ref())
.map(|rows| Value::List(Arc::new(rows)))
.map_err(map_err),
other => Err(RuntimeError::IoError(format!(
"{backend} in-place List<Schema> sub-record field `{name}` has unsupported type \
{other:?}"
))),
}
}