use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use crate::errors::{Result, RpcError};
use crate::log::{LogLevel, LogMessage};
use crate::wire::Metadata;
pub(crate) enum Emitted {
Batch {
batch: RecordBatch,
metadata: Option<Metadata>,
},
Log(LogMessage),
}
pub struct OutputCollector {
schema: SchemaRef,
pub(crate) items: Vec<Emitted>,
finished: bool,
is_producer: bool,
}
impl OutputCollector {
pub(crate) fn new(schema: SchemaRef, is_producer: bool) -> Self {
Self {
schema,
items: Vec::new(),
finished: false,
is_producer,
}
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn emit(&mut self, batch: RecordBatch) -> Result<()> {
if batch.schema() != self.schema {
return Err(RpcError::runtime_error(format!(
"emit(): schema mismatch — expected {:?}, got {:?}",
self.schema.fields(),
batch.schema().fields()
)));
}
self.items.push(Emitted::Batch {
batch,
metadata: None,
});
Ok(())
}
pub fn emit_with_metadata(&mut self, batch: RecordBatch, metadata: Metadata) -> Result<()> {
if batch.schema() != self.schema {
return Err(RpcError::runtime_error(format!(
"emit_with_metadata(): schema mismatch — expected {:?}, got {:?}",
self.schema.fields(),
batch.schema().fields()
)));
}
self.items.push(Emitted::Batch {
batch,
metadata: Some(metadata),
});
Ok(())
}
pub fn finish(&mut self) {
self.finished = true;
}
pub fn finished(&self) -> bool {
self.finished
}
pub fn client_log(&mut self, level: LogLevel, message: impl Into<String>) {
self.items
.push(Emitted::Log(LogMessage::new(level, message)));
}
pub fn client_log_with(&mut self, msg: LogMessage) {
self.items.push(Emitted::Log(msg));
}
pub fn is_producer(&self) -> bool {
self.is_producer
}
}
pub trait ProducerState: Send {
fn produce(&mut self, out: &mut OutputCollector, ctx: &CallContext) -> Result<()>;
fn on_cancel(&mut self, _ctx: &CallContext) {}
fn batch_limit(&self) -> Option<usize> {
None
}
fn encode_state(&self) -> Result<Vec<u8>> {
Err(RpcError::runtime_error(
"producer state does not implement encode_state(); \
override this method or register the method via MethodInfo::stream_with_codec",
))
}
}
pub trait ExchangeState: Send {
fn exchange(
&mut self,
input: &RecordBatch,
out: &mut OutputCollector,
ctx: &CallContext,
) -> Result<()>;
fn on_cancel(&mut self, _ctx: &CallContext) {}
fn encode_state(&self) -> Result<Vec<u8>> {
Err(RpcError::runtime_error(
"exchange state does not implement encode_state(); \
override this method or register the method via MethodInfo::stream_with_codec",
))
}
}
pub struct StreamResult {
pub output_schema: SchemaRef,
pub input_schema: Option<SchemaRef>,
pub state: StreamStateKind,
pub header: Option<RecordBatch>,
pub header_metadata: Option<Metadata>,
}
pub enum StreamStateKind {
Producer(Box<dyn ProducerState>),
Exchange(Box<dyn ExchangeState>),
}
impl StreamResult {
pub fn producer(schema: SchemaRef, state: Box<dyn ProducerState>) -> Self {
Self {
output_schema: schema,
input_schema: None,
state: StreamStateKind::Producer(state),
header: None,
header_metadata: None,
}
}
pub fn exchange(
output_schema: SchemaRef,
input_schema: SchemaRef,
state: Box<dyn ExchangeState>,
) -> Self {
Self {
output_schema,
input_schema: Some(input_schema),
state: StreamStateKind::Exchange(state),
header: None,
header_metadata: None,
}
}
pub fn with_header(mut self, header: RecordBatch) -> Self {
self.header = Some(header);
self
}
}
#[cfg(feature = "http")]
#[doc(hidden)]
pub fn producer_decoder<S>() -> crate::server::StateDecoder
where
S: ProducerState + crate::stream_codec::StreamStateCodec + 'static,
{
Arc::new(|bytes: &[u8]| Ok(StreamStateKind::Producer(Box::new(S::decode(bytes)?))))
}
#[cfg(feature = "http")]
#[doc(hidden)]
pub fn exchange_decoder<S>() -> crate::server::StateDecoder
where
S: ExchangeState + crate::stream_codec::StreamStateCodec + 'static,
{
Arc::new(|bytes: &[u8]| Ok(StreamStateKind::Exchange(Box::new(S::decode(bytes)?))))
}
pub(crate) fn empty_schema() -> SchemaRef {
Arc::new(Schema::empty())
}
pub use crate::server::CallContext;