use arrow::{datatypes::Schema, record_batch::RecordBatch};
use crate::{
index::{IndexOps, WindowIndex},
prelude::*,
stream::operator::window::WindowContext,
table::{to_record_batches, ImmutableTable, RawRecordBatch},
util::ArconFnBounds,
};
use arcon_state::{backend::handles::ActiveHandle, Backend, VecState};
use std::marker::PhantomData;
pub struct ArrowWindow<IN, OUT, F, B>
where
IN: ArconType + ToArrow,
OUT: ArconType,
F: Fn(Arc<Schema>, Vec<RecordBatch>) -> ArconResult<OUT> + ArconFnBounds,
B: Backend,
{
handle: ActiveHandle<B, VecState<RawRecordBatch>, u64, u64>,
map: std::collections::HashMap<WindowContext, MutableTable>,
udf: F,
_marker: std::marker::PhantomData<IN>,
}
impl<IN, OUT, F, B> ArrowWindow<IN, OUT, F, B>
where
IN: ArconType + ToArrow,
OUT: ArconType,
F: Fn(Arc<Schema>, Vec<RecordBatch>) -> ArconResult<OUT> + ArconFnBounds,
B: Backend,
{
pub fn new(backend: Arc<B>, udf: F) -> Self {
let mut handle = Handle::vec("window_handle")
.with_item_key(0)
.with_namespace(0);
backend.register_vec_handle(&mut handle);
let handle = handle.activate(backend);
Self {
handle,
map: std::collections::HashMap::new(),
udf,
_marker: PhantomData,
}
}
}
impl<IN, OUT, F, B> WindowIndex for ArrowWindow<IN, OUT, F, B>
where
IN: ArconType + ToArrow,
OUT: ArconType,
F: Fn(Arc<Schema>, Vec<RecordBatch>) -> ArconResult<OUT> + ArconFnBounds,
B: Backend,
{
type IN = IN;
type OUT = OUT;
fn on_element(&mut self, element: Self::IN, ctx: WindowContext) -> ArconResult<()> {
let table = self.map.entry(ctx).or_insert_with(IN::table);
table.append(element, None)?;
Ok(())
}
fn result(&mut self, ctx: WindowContext) -> ArconResult<Self::OUT> {
let table = self.map.entry(ctx).or_insert_with(IN::table);
self.handle.set_item_key(ctx.key);
self.handle.set_namespace(ctx.index);
let mut batches = table.batches()?;
let raw_batches = self.handle.get()?;
batches.append(&mut to_record_batches(Arc::new(IN::schema()), raw_batches)?);
(self.udf)(Arc::new(IN::schema()), batches)
}
fn clear(&mut self, ctx: WindowContext) -> ArconResult<()> {
let _ = self.map.remove(&ctx);
self.handle.set_item_key(ctx.key);
self.handle.set_namespace(ctx.index);
self.handle.clear()?;
Ok(())
}
}
impl<IN, OUT, F, B> IndexOps for ArrowWindow<IN, OUT, F, B>
where
IN: ArconType + ToArrow,
OUT: ArconType,
F: Fn(Arc<Schema>, Vec<RecordBatch>) -> ArconResult<OUT> + ArconFnBounds,
B: Backend,
{
fn persist(&mut self) -> ArconResult<()> {
for (ctx, table) in self.map.iter_mut() {
self.handle.set_item_key(ctx.key);
self.handle.set_namespace(ctx.index);
let batches = table.raw_batches()?;
for batch in batches {
self.handle.append(batch)?;
}
}
Ok(())
}
fn set_key(&mut self, _: u64) {}
fn table(&mut self) -> ArconResult<Option<ImmutableTable>> {
Ok(None)
}
}