use core::marker::PhantomData;
use reifydb_abi::data::column::ColumnTypeCode;
use crate::{
error::FFIError,
operator::builder::{ColumnBuilder, ColumnsBuilder, CommittedColumn},
};
pub struct ScalarWriter<'a, T: Copy> {
inner: ColumnBuilder<'a>,
cursor: usize,
capacity: usize,
defined: Option<Vec<bool>>,
_t: PhantomData<T>,
}
impl<'a, T: Copy> ScalarWriter<'a, T> {
fn new(inner: ColumnBuilder<'a>, capacity: usize) -> Self {
Self {
inner,
cursor: 0,
capacity,
defined: None,
_t: PhantomData,
}
}
#[inline]
pub fn push(&mut self, v: T) {
debug_assert!(self.cursor < self.capacity, "ScalarWriter::push past capacity");
unsafe {
let data = self.inner.data_ptr() as *mut T;
core::ptr::write_unaligned(data.add(self.cursor), v);
}
if let Some(d) = self.defined.as_mut() {
d.push(true);
}
self.cursor += 1;
}
#[inline]
pub fn push_none(&mut self)
where
T: Default,
{
debug_assert!(self.cursor < self.capacity, "ScalarWriter::push_none past capacity");
unsafe {
let data = self.inner.data_ptr() as *mut T;
core::ptr::write_unaligned(data.add(self.cursor), T::default());
}
let d = self.defined.get_or_insert_with(|| vec![true; self.cursor]);
d.push(false);
self.cursor += 1;
}
#[inline]
pub fn len(&self) -> usize {
self.cursor
}
#[inline]
pub fn is_empty(&self) -> bool {
self.cursor == 0
}
pub fn finish(self) -> Result<CommittedColumn, FFIError> {
if let Some(d) = &self.defined {
self.inner.set_defined(d);
}
self.inner.commit(self.cursor)
}
}
pub struct BoolWriter<'a> {
inner: ColumnBuilder<'a>,
values: Vec<bool>,
defined: Option<Vec<bool>>,
}
impl<'a> BoolWriter<'a> {
fn new(inner: ColumnBuilder<'a>, capacity: usize) -> Self {
Self {
inner,
values: Vec::with_capacity(capacity),
defined: None,
}
}
#[inline]
pub fn push(&mut self, v: bool) {
self.values.push(v);
if let Some(d) = self.defined.as_mut() {
d.push(true);
}
}
#[inline]
pub fn push_none(&mut self) {
self.values.push(false);
let d = self.defined.get_or_insert_with(|| vec![true; self.values.len() - 1]);
d.push(false);
}
#[inline]
pub fn len(&self) -> usize {
self.values.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn finish(self) -> Result<CommittedColumn, FFIError> {
if let Some(d) = &self.defined {
self.inner.set_defined(d);
}
self.inner.write_bool(&self.values)
}
}
pub struct VarLenWriter<'a> {
inner: ColumnBuilder<'a>,
item_cursor: usize,
byte_cursor: usize,
data_capacity: usize,
capacity: usize,
defined: Option<Vec<bool>>,
type_code: ColumnTypeCode,
}
impl<'a> VarLenWriter<'a> {
fn new(inner: ColumnBuilder<'a>, capacity: usize, expected_bytes: usize) -> Result<Self, FFIError> {
let type_code = inner.type_code();
debug_assert!(
matches!(type_code, ColumnTypeCode::Utf8 | ColumnTypeCode::Blob | ColumnTypeCode::Decimal),
"VarLenWriter requires Utf8, Blob, or Decimal",
);
let initial = expected_bytes.max(capacity);
if initial > 0 {
inner.grow(initial)?;
}
unsafe {
core::ptr::write(inner.offsets_ptr(), 0u64);
}
Ok(Self {
inner,
item_cursor: 0,
byte_cursor: 0,
data_capacity: initial,
capacity,
defined: None,
type_code,
})
}
#[inline]
fn ensure_capacity(&mut self, need: usize) -> Result<(), FFIError> {
if self.byte_cursor + need <= self.data_capacity {
return Ok(());
}
let extra = (self.byte_cursor + need - self.data_capacity).max(self.data_capacity.max(64));
self.inner.grow(extra)?;
self.data_capacity += extra;
Ok(())
}
#[inline]
fn push_bytes_internal(&mut self, bytes: &[u8]) -> Result<(), FFIError> {
debug_assert!(self.item_cursor < self.capacity, "VarLenWriter::push past capacity");
self.ensure_capacity(bytes.len())?;
unsafe {
let data = self.inner.data_ptr();
let offsets = self.inner.offsets_ptr();
if !bytes.is_empty() {
core::ptr::copy_nonoverlapping(bytes.as_ptr(), data.add(self.byte_cursor), bytes.len());
}
self.byte_cursor += bytes.len();
core::ptr::write(offsets.add(self.item_cursor + 1), self.byte_cursor as u64);
}
if let Some(d) = self.defined.as_mut() {
d.push(true);
}
self.item_cursor += 1;
Ok(())
}
pub fn push_str(&mut self, s: &str) -> Result<(), FFIError> {
debug_assert_eq!(self.type_code, ColumnTypeCode::Utf8);
self.push_bytes_internal(s.as_bytes())
}
pub fn push_bytes(&mut self, b: &[u8]) -> Result<(), FFIError> {
debug_assert!(matches!(self.type_code, ColumnTypeCode::Blob | ColumnTypeCode::Decimal));
self.push_bytes_internal(b)
}
pub fn push_none(&mut self) -> Result<(), FFIError> {
debug_assert!(self.item_cursor < self.capacity, "VarLenWriter::push_none past capacity");
unsafe {
let offsets = self.inner.offsets_ptr();
core::ptr::write(offsets.add(self.item_cursor + 1), self.byte_cursor as u64);
}
let d = self.defined.get_or_insert_with(|| vec![true; self.item_cursor]);
d.push(false);
self.item_cursor += 1;
Ok(())
}
#[inline]
pub fn len(&self) -> usize {
self.item_cursor
}
#[inline]
pub fn is_empty(&self) -> bool {
self.item_cursor == 0
}
pub fn finish(self) -> Result<CommittedColumn, FFIError> {
if let Some(d) = &self.defined {
self.inner.set_defined(d);
}
self.inner.commit(self.item_cursor)
}
}
pub type U8Writer<'a> = ScalarWriter<'a, u8>;
pub type U16Writer<'a> = ScalarWriter<'a, u16>;
pub type U32Writer<'a> = ScalarWriter<'a, u32>;
pub type U64Writer<'a> = ScalarWriter<'a, u64>;
pub type U128Writer<'a> = ScalarWriter<'a, u128>;
pub type I8Writer<'a> = ScalarWriter<'a, i8>;
pub type I16Writer<'a> = ScalarWriter<'a, i16>;
pub type I32Writer<'a> = ScalarWriter<'a, i32>;
pub type I64Writer<'a> = ScalarWriter<'a, i64>;
pub type I128Writer<'a> = ScalarWriter<'a, i128>;
pub type F32Writer<'a> = ScalarWriter<'a, f32>;
pub type F64Writer<'a> = ScalarWriter<'a, f64>;
pub type Utf8Writer<'a> = VarLenWriter<'a>;
pub type BlobWriter<'a> = VarLenWriter<'a>;
pub type DecimalWriter<'a> = VarLenWriter<'a>;
impl<'a> ColumnsBuilder<'a> {
pub fn u8_writer(&mut self, capacity: usize) -> Result<U8Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Uint1, capacity)?, capacity))
}
pub fn u16_writer(&mut self, capacity: usize) -> Result<U16Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Uint2, capacity)?, capacity))
}
pub fn u32_writer(&mut self, capacity: usize) -> Result<U32Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Uint4, capacity)?, capacity))
}
pub fn u64_writer(&mut self, capacity: usize) -> Result<U64Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Uint8, capacity)?, capacity))
}
pub fn u128_writer(&mut self, capacity: usize) -> Result<U128Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Uint16, capacity)?, capacity))
}
pub fn i8_writer(&mut self, capacity: usize) -> Result<I8Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Int1, capacity)?, capacity))
}
pub fn i16_writer(&mut self, capacity: usize) -> Result<I16Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Int2, capacity)?, capacity))
}
pub fn i32_writer(&mut self, capacity: usize) -> Result<I32Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Int4, capacity)?, capacity))
}
pub fn i64_writer(&mut self, capacity: usize) -> Result<I64Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Int8, capacity)?, capacity))
}
pub fn i128_writer(&mut self, capacity: usize) -> Result<I128Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Int16, capacity)?, capacity))
}
pub fn f32_writer(&mut self, capacity: usize) -> Result<F32Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Float4, capacity)?, capacity))
}
pub fn f64_writer(&mut self, capacity: usize) -> Result<F64Writer<'_>, FFIError> {
Ok(ScalarWriter::new(self.acquire(ColumnTypeCode::Float8, capacity)?, capacity))
}
pub fn bool_writer(&mut self, capacity: usize) -> Result<BoolWriter<'_>, FFIError> {
Ok(BoolWriter::new(self.acquire(ColumnTypeCode::Bool, capacity)?, capacity))
}
pub fn utf8_writer(&mut self, capacity: usize, expected_bytes: usize) -> Result<Utf8Writer<'_>, FFIError> {
VarLenWriter::new(self.acquire(ColumnTypeCode::Utf8, capacity)?, capacity, expected_bytes)
}
pub fn blob_writer(&mut self, capacity: usize, expected_bytes: usize) -> Result<BlobWriter<'_>, FFIError> {
VarLenWriter::new(self.acquire(ColumnTypeCode::Blob, capacity)?, capacity, expected_bytes)
}
pub fn decimal_writer(
&mut self,
capacity: usize,
expected_bytes: usize,
) -> Result<DecimalWriter<'_>, FFIError> {
VarLenWriter::new(self.acquire(ColumnTypeCode::Decimal, capacity)?, capacity, expected_bytes)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
use reifydb_core::interface::catalog::flow::FlowNodeId;
use reifydb_type::value::{Value, decimal::Decimal, row_number::RowNumber};
use crate::{
error::Result,
operator::{
FFIOperator, FFIOperatorMetadata,
change::BorrowedChange,
column::{batch::InsertBatch, operator::OperatorColumn},
context::OperatorContext,
},
row,
testing::{builders::TestChangeBuilder, harness::TestHarnessBuilder},
};
struct U8Row {
v: u8,
}
row!(U8Row {
v: u8
});
struct OpU8;
impl FFIOperatorMetadata for OpU8 {
const NAME: &'static str = "writer_u8";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpU8 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<U8Row>::new(ctx, 3)?;
for (i, &v) in [0u8, 1, u8::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&U8Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_u8_roundtrip() {
let mut h = TestHarnessBuilder::<OpU8>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").u8("v"), Some(0));
assert_eq!(post.row_ref(1).expect("r1").u8("v"), Some(1));
assert_eq!(post.row_ref(2).expect("r2").u8("v"), Some(u8::MAX));
}
struct U16Row {
v: u16,
}
row!(U16Row {
v: u16
});
struct OpU16;
impl FFIOperatorMetadata for OpU16 {
const NAME: &'static str = "writer_u16";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpU16 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<U16Row>::new(ctx, 3)?;
for (i, &v) in [0u16, 1, u16::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&U16Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_u16_roundtrip() {
let mut h = TestHarnessBuilder::<OpU16>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").u16("v"), Some(0));
assert_eq!(post.row_ref(1).expect("r1").u16("v"), Some(1));
assert_eq!(post.row_ref(2).expect("r2").u16("v"), Some(u16::MAX));
}
struct U32Row {
v: u32,
}
row!(U32Row {
v: u32
});
struct OpU32;
impl FFIOperatorMetadata for OpU32 {
const NAME: &'static str = "writer_u32";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpU32 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<U32Row>::new(ctx, 3)?;
for (i, &v) in [0u32, 1, u32::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&U32Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_u32_roundtrip() {
let mut h = TestHarnessBuilder::<OpU32>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").u32("v"), Some(0));
assert_eq!(post.row_ref(1).expect("r1").u32("v"), Some(1));
assert_eq!(post.row_ref(2).expect("r2").u32("v"), Some(u32::MAX));
}
struct U64Row {
v: u64,
}
row!(U64Row {
v: u64
});
struct OpU64;
impl FFIOperatorMetadata for OpU64 {
const NAME: &'static str = "writer_u64";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpU64 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<U64Row>::new(ctx, 3)?;
for (i, &v) in [0u64, 1, u64::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&U64Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_u64_roundtrip() {
let mut h = TestHarnessBuilder::<OpU64>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").u64("v"), Some(0));
assert_eq!(post.row_ref(1).expect("r1").u64("v"), Some(1));
assert_eq!(post.row_ref(2).expect("r2").u64("v"), Some(u64::MAX));
}
struct I8Row {
v: i8,
}
row!(I8Row {
v: i8
});
struct OpI8;
impl FFIOperatorMetadata for OpI8 {
const NAME: &'static str = "writer_i8";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpI8 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<I8Row>::new(ctx, 3)?;
for (i, &v) in [i8::MIN, 0_i8, i8::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&I8Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_i8_roundtrip() {
let mut h = TestHarnessBuilder::<OpI8>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").i8("v"), Some(i8::MIN));
assert_eq!(post.row_ref(1).expect("r1").i8("v"), Some(0));
assert_eq!(post.row_ref(2).expect("r2").i8("v"), Some(i8::MAX));
}
struct I16Row {
v: i16,
}
row!(I16Row {
v: i16
});
struct OpI16;
impl FFIOperatorMetadata for OpI16 {
const NAME: &'static str = "writer_i16";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpI16 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<I16Row>::new(ctx, 3)?;
for (i, &v) in [i16::MIN, 0_i16, i16::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&I16Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_i16_roundtrip() {
let mut h = TestHarnessBuilder::<OpI16>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").i16("v"), Some(i16::MIN));
assert_eq!(post.row_ref(1).expect("r1").i16("v"), Some(0));
assert_eq!(post.row_ref(2).expect("r2").i16("v"), Some(i16::MAX));
}
struct I32Row {
v: i32,
}
row!(I32Row {
v: i32
});
struct OpI32;
impl FFIOperatorMetadata for OpI32 {
const NAME: &'static str = "writer_i32";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpI32 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<I32Row>::new(ctx, 3)?;
for (i, &v) in [i32::MIN, 0_i32, i32::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&I32Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_i32_roundtrip() {
let mut h = TestHarnessBuilder::<OpI32>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").i32("v"), Some(i32::MIN));
assert_eq!(post.row_ref(1).expect("r1").i32("v"), Some(0));
assert_eq!(post.row_ref(2).expect("r2").i32("v"), Some(i32::MAX));
}
struct I64Row {
v: i64,
}
row!(I64Row {
v: i64
});
struct OpI64;
impl FFIOperatorMetadata for OpI64 {
const NAME: &'static str = "writer_i64";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpI64 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<I64Row>::new(ctx, 3)?;
for (i, &v) in [i64::MIN, 0_i64, i64::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&I64Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_i64_roundtrip() {
let mut h = TestHarnessBuilder::<OpI64>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").i64("v"), Some(i64::MIN));
assert_eq!(post.row_ref(1).expect("r1").i64("v"), Some(0));
assert_eq!(post.row_ref(2).expect("r2").i64("v"), Some(i64::MAX));
}
struct F32Row {
v: f32,
}
row!(F32Row {
v: f32
});
struct OpF32;
impl FFIOperatorMetadata for OpF32 {
const NAME: &'static str = "writer_f32";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpF32 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<F32Row>::new(ctx, 3)?;
for (i, &v) in [0.0_f32, -1.5_f32, f32::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&F32Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_f32_roundtrip() {
let mut h = TestHarnessBuilder::<OpF32>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").f32("v"), Some(0.0_f32));
assert_eq!(post.row_ref(1).expect("r1").f32("v"), Some(-1.5_f32));
assert_eq!(post.row_ref(2).expect("r2").f32("v"), Some(f32::MAX));
}
struct F64Row {
v: f64,
}
row!(F64Row {
v: f64
});
struct OpF64;
impl FFIOperatorMetadata for OpF64 {
const NAME: &'static str = "writer_f64";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpF64 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<F64Row>::new(ctx, 3)?;
for (i, &v) in [0.0_f64, -1.5_f64, f64::MAX].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&F64Row {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn scalar_f64_roundtrip() {
let mut h = TestHarnessBuilder::<OpF64>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").f64("v"), Some(0.0_f64));
assert_eq!(post.row_ref(1).expect("r1").f64("v"), Some(-1.5_f64));
assert_eq!(post.row_ref(2).expect("r2").f64("v"), Some(f64::MAX));
}
struct BoolRow {
v: bool,
}
row!(BoolRow {
v: bool
});
struct OpBool;
impl FFIOperatorMetadata for OpBool {
const NAME: &'static str = "writer_bool";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpBool {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<BoolRow>::new(ctx, 3)?;
for (i, &v) in [true, false, true].iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&BoolRow {
v,
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn bool_roundtrip() {
let mut h = TestHarnessBuilder::<OpBool>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").bool("v"), Some(true));
assert_eq!(post.row_ref(1).expect("r1").bool("v"), Some(false));
assert_eq!(post.row_ref(2).expect("r2").bool("v"), Some(true));
}
struct Utf8Row {
s: String,
}
row!(Utf8Row {
s: String
});
struct OpUtf8;
impl FFIOperatorMetadata for OpUtf8 {
const NAME: &'static str = "writer_utf8";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpUtf8 {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let values = ["", "hello", "こんにちは"];
let mut batch = InsertBatch::<Utf8Row>::new(ctx, values.len())?;
for (i, &s) in values.iter().enumerate() {
batch.push(
RowNumber(i as u64 + 1),
&Utf8Row {
s: s.to_string(),
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn utf8_roundtrip() {
let mut h = TestHarnessBuilder::<OpUtf8>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").utf8("s").as_deref(), Some(""));
assert_eq!(post.row_ref(1).expect("r1").utf8("s").as_deref(), Some("hello"));
assert_eq!(post.row_ref(2).expect("r2").utf8("s").as_deref(), Some("こんにちは"));
}
struct OpUtf8Growth;
impl FFIOperatorMetadata for OpUtf8Growth {
const NAME: &'static str = "writer_utf8_growth";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpUtf8Growth {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<Utf8Row>::new(ctx, 20)?;
for i in 0..20u64 {
batch.push(
RowNumber(i + 1),
&Utf8Row {
s: "x".repeat(100),
},
)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn utf8_capacity_growth() {
let mut h = TestHarnessBuilder::<OpUtf8Growth>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 20);
let expected = "x".repeat(100);
for i in 0..20usize {
assert_eq!(
post.row_ref(i).expect("row").utf8("s").as_deref(),
Some(expected.as_str()),
"row {i}"
);
}
}
struct BlobRow {
b: Vec<u8>,
}
row!(BlobRow { b: Vec<u8> });
struct OpBlob;
impl FFIOperatorMetadata for OpBlob {
const NAME: &'static str = "writer_blob";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpBlob {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let rows = [
BlobRow {
b: vec![],
},
BlobRow {
b: vec![0u8, 1, 127, 255],
},
BlobRow {
b: vec![42u8; 1000],
},
];
let mut batch = InsertBatch::<BlobRow>::new(ctx, rows.len())?;
for (i, row) in rows.iter().enumerate() {
batch.push(RowNumber(i as u64 + 1), row)?;
}
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn blob_roundtrip() {
let mut h = TestHarnessBuilder::<OpBlob>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").blob("b"), Some(vec![]));
assert_eq!(post.row_ref(1).expect("r1").blob("b"), Some(vec![0u8, 1, 127, 255]));
assert_eq!(post.row_ref(2).expect("r2").blob("b"), Some(vec![42u8; 1000]));
}
struct DecimalRow {
d: Decimal,
}
row!(DecimalRow {
d: Decimal
});
struct OpDecimal;
impl FFIOperatorMetadata for OpDecimal {
const NAME: &'static str = "writer_decimal";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpDecimal {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<DecimalRow>::new(ctx, 3)?;
batch.push(
RowNumber(1),
&DecimalRow {
d: Decimal::zero(),
},
)?;
batch.push(
RowNumber(2),
&DecimalRow {
d: Decimal::from_i64(1234),
},
)?;
batch.push(
RowNumber(3),
&DecimalRow {
d: Decimal::from_i64(-5678),
},
)?;
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn decimal_roundtrip() {
let mut h = TestHarnessBuilder::<OpDecimal>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 3);
assert_eq!(post.row_ref(0).expect("r0").decimal("d"), Some(Decimal::zero()));
assert_eq!(post.row_ref(1).expect("r1").decimal("d"), Some(Decimal::from_i64(1234)));
assert_eq!(post.row_ref(2).expect("r2").decimal("d"), Some(Decimal::from_i64(-5678)));
}
struct WideRow {
a: u128,
b: i128,
}
row!(WideRow {
a: u128,
b: i128
});
struct OpWide;
impl FFIOperatorMetadata for OpWide {
const NAME: &'static str = "writer_wide";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "test fixture";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for OpWide {
fn new(_: FlowNodeId, _: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, _: BorrowedChange<'_>) -> Result<()> {
let mut batch = InsertBatch::<WideRow>::new(ctx, 1)?;
batch.push(
RowNumber(1),
&WideRow {
a: u128::MAX,
b: i128::MIN,
},
)?;
batch.finish()
}
fn pull(&mut self, _: &mut OperatorContext, _: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn wide_integers_roundtrip() {
let mut h = TestHarnessBuilder::<OpWide>::new().build().expect("harness");
let out = h.apply(TestChangeBuilder::new().build()).expect("apply");
let post = out.diffs[0].post().expect("post");
assert_eq!(post.row_count(), 1);
assert_eq!(post.row_ref(0).expect("r0").u128("a"), Some(u128::MAX));
assert_eq!(post.row_ref(0).expect("r0").i128("b"), Some(i128::MIN));
}
}