use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex};
use anyhow::Result as AnyResult;
#[cfg(feature = "with-avro")]
use apache_avro::{
Schema as AvroSchema,
schema::{Name as AvroName, NamesRef},
types::Value as AvroValue,
};
use arrow::record_batch::RecordBatch;
use dbsp::circuit::NodeId;
use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Erase, Factory};
use dbsp::operator::StagedBuffers;
use dyn_clone::DynClone;
use feldera_sqllib::Variant;
use feldera_types::format::csv::CsvParserConfig;
use feldera_types::format::json::JsonFlavor;
use feldera_types::program_schema::{Relation, SqlIdentifier};
use feldera_types::serde_with_context::SqlSerdeConfig;
use serde_arrow::ArrayBuilder;
#[cfg(feature = "with-avro")]
use std::collections::HashMap;
use crate::errors::controller::ControllerError;
use crate::format::InputBuffer;
use crate::preprocess::PreprocessorRegistry;
#[derive(Clone)]
pub enum RecordFormat {
Json(JsonFlavor),
Csv(CsvParserConfig),
Parquet(SqlSerdeConfig),
#[cfg(feature = "with-avro")]
Avro,
Raw(String),
}
pub trait DeCollectionStream: Send + Sync + InputBuffer {
fn insert(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
fn delete(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
fn update(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
fn reserve(&mut self, reservation: usize);
fn truncate(&mut self, len: usize);
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
fn fork(&self) -> Box<dyn DeCollectionStream>;
}
pub trait ArrowStream: InputBuffer + Send + Sync {
fn insert(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
fn delete(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
fn insert_with_polarities(
&mut self,
data: &RecordBatch,
polarities: &[bool],
metadata: &Option<Variant>,
) -> AnyResult<()>;
fn fork(&self) -> Box<dyn ArrowStream>;
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
}
#[cfg(feature = "with-avro")]
pub type AvroSchemaRefs = HashMap<AvroName, AvroSchema>;
#[cfg(feature = "with-avro")]
pub trait AvroStream: InputBuffer + Send + Sync {
fn insert(
&mut self,
data: &AvroValue,
schema: &AvroSchema,
refs: &AvroSchemaRefs,
n_bytes: usize,
metadata: &Option<Variant>,
) -> AnyResult<()>;
fn delete(
&mut self,
data: &AvroValue,
schema: &AvroSchema,
refs: &AvroSchemaRefs,
n_bytes: usize,
metadata: &Option<Variant>,
) -> AnyResult<()>;
fn fork(&self) -> Box<dyn AvroStream>;
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
}
pub trait DeCollectionHandle: Send + Sync {
fn configure_deserializer(
&self,
record_format: RecordFormat,
) -> Result<Box<dyn DeCollectionStream>, ControllerError>;
fn configure_arrow_deserializer(
&self,
config: SqlSerdeConfig,
) -> Result<Box<dyn ArrowStream>, ControllerError>;
#[cfg(feature = "with-avro")]
fn configure_avro_deserializer(&self) -> Result<Box<dyn AvroStream>, ControllerError>;
fn fork(&self) -> Box<dyn DeCollectionHandle>;
}
pub trait SerBatchReader: 'static + Send + Sync {
fn key_count(&self) -> usize;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn cursor<'a>(
&'a self,
record_format: RecordFormat,
) -> Result<Box<dyn SerCursor + Send + 'a>, ControllerError>;
fn batches(&self) -> Vec<Arc<dyn SerBatch>>;
fn snapshot(&self) -> Arc<dyn SerBatchReader>;
fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
fn key_factory(&self) -> &'static dyn Factory<DynData>;
fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
}
impl Debug for dyn SerBatchReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut cursor = self
.cursor(RecordFormat::Json(Default::default()))
.map_err(|_| std::fmt::Error)?;
let mut key = Vec::new();
let mut val = Vec::new();
while cursor.key_valid() {
cursor
.serialize_key(&mut key)
.map_err(|_| std::fmt::Error)?;
write!(f, "{}=>{{", String::from_utf8_lossy(&key))?;
while cursor.val_valid() {
cursor
.serialize_val(&mut val)
.map_err(|_| std::fmt::Error)?;
write!(
f,
"{}=>{}, ",
String::from_utf8_lossy(&val),
cursor.weight()
)?;
val.clear();
cursor.step_val();
}
write!(f, "}}, ")?;
key.clear();
cursor.step_key();
}
Ok(())
}
}
impl Debug for dyn SerBatch {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.as_batch_reader().fmt(f)
}
}
pub trait SerBatch: SerBatchReader {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Sync + Send>;
fn merge(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatch>;
fn concat(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatchReader>;
fn as_batch_reader(&self) -> &dyn SerBatchReader;
fn arc_as_batch_reader(self: Arc<Self>) -> Arc<dyn SerBatchReader>;
fn into_trace(self: Arc<Self>) -> Box<dyn SerTrace>;
}
pub trait SerTrace: SerBatchReader {
fn insert(&mut self, batch: Arc<dyn SerBatch>);
fn as_batch_reader(&self) -> &dyn SerBatchReader;
}
#[doc(hidden)]
pub struct SplitCursorBuilder {
batch: Arc<dyn SerBatchReader>,
start_key: Box<DynData>,
end_key: Option<Box<DynData>>,
format: RecordFormat,
}
impl SplitCursorBuilder {
pub fn from_bounds(
batch: Arc<dyn SerBatchReader>,
bounds: &DynVec<DynData>,
index: usize,
format: RecordFormat,
) -> Option<Self> {
let start_bound = if index == 0 {
None
} else if index <= bounds.len() {
Some(bounds.index(index - 1).as_data())
} else {
None
};
let end_bound = if index < bounds.len() {
Some(bounds.index(index).as_data())
} else {
None
};
let start_key = {
let mut cursor = batch.cursor(format.clone()).unwrap();
if let Some(start_bound) = start_bound {
cursor.seek_key_exact(start_bound);
}
cursor.get_key().map(|s| {
let mut key = batch.key_factory().default_box();
s.clone_to(key.as_mut());
key
})
}?;
let end_key = end_bound.map(|e| {
let mut key = batch.key_factory().default_box();
e.clone_to(key.as_mut());
key
});
Some(SplitCursorBuilder {
batch,
start_key,
end_key,
format,
})
}
pub fn build<'a>(&'a self) -> SplitCursor<'a> {
let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
cursor.seek_key(self.start_key.as_data());
SplitCursor {
cursor,
start_key: self.start_key.clone(),
end_key: self.end_key.clone(),
}
}
}
#[doc(hidden)]
pub struct SplitCursor<'a> {
cursor: Box<dyn SerCursor + 'a>,
start_key: Box<DynData>,
end_key: Option<Box<DynData>>,
}
impl SplitCursor<'_> {
fn finished(&self) -> bool {
if let Some(ref end_key) = self.end_key
&& let Some(current_key) = self.cursor.get_key()
{
return current_key >= end_key.as_data();
}
false
}
}
impl SerCursor for SplitCursor<'_> {
fn key_valid(&self) -> bool {
self.cursor.key_valid() && !self.finished()
}
fn val_valid(&self) -> bool {
self.cursor.val_valid()
}
fn key(&self) -> &DynData {
self.cursor.key()
}
fn val(&self) -> &DynData {
self.cursor.val()
}
fn get_key(&self) -> Option<&DynData> {
if !self.key_valid() {
return None;
}
self.cursor.get_key()
}
fn get_val(&self) -> Option<&DynData> {
self.cursor.get_val()
}
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key(dst)
}
fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.key_to_json()
}
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_key_fields(fields, dst)
}
fn serialize_val_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_val_fields(fields, dst)
}
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_key_to_arrow(dst)
}
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_key_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_val_to_arrow(dst)
}
fn serialize_val_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_val_to_arrow_with_metadata(metadata, dst)
}
#[cfg(feature = "with-avro")]
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.key_to_avro(schema, refs)
}
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key_weight(dst)
}
fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val_weight(dst)
}
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val(dst)
}
fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.val_to_json()
}
#[cfg(feature = "with-avro")]
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.val_to_avro(schema, refs)
}
fn weight(&mut self) -> i64 {
self.cursor.weight()
}
fn step_key(&mut self) {
self.cursor.step_key();
}
fn step_val(&mut self) {
self.cursor.step_val();
}
fn rewind_keys(&mut self) {
self.cursor.rewind_keys();
self.cursor.seek_key(self.start_key.as_data());
}
fn rewind_vals(&mut self) {
self.cursor.rewind_vals();
}
fn seek_key_exact(&mut self, key: &DynData) -> bool {
if let Some(ref end_key) = self.end_key
&& key >= end_key.as_data()
{
return false;
}
self.cursor.seek_key_exact(key)
}
fn seek_key(&mut self, key: &DynData) {
self.cursor.seek_key(key);
}
}
pub struct SerCursorFlattened<'a> {
val_valid: bool,
cursor: Box<dyn SerCursor + 'a>,
}
impl<'a> SerCursorFlattened<'a> {
pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
Self {
cursor,
val_valid: true,
}
}
}
impl<'a> SerCursor for SerCursorFlattened<'a> {
fn key_valid(&self) -> bool {
self.cursor.key_valid() && self.cursor.val_valid()
}
fn val_valid(&self) -> bool {
self.val_valid
}
fn key(&self) -> &DynData {
self.cursor.val()
}
fn get_key(&self) -> Option<&DynData> {
self.cursor.get_val()
}
fn val(&self) -> &DynData {
().erase()
}
fn get_val(&self) -> Option<&DynData> {
if self.val_valid {
Some(().erase())
} else {
None
}
}
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val(dst)
}
fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.val_to_json()
}
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_val_fields(fields, dst)
}
fn serialize_val_fields(
&mut self,
_fields: &HashSet<String>,
_dst: &mut Vec<u8>,
) -> AnyResult<()> {
panic!("serialize_val_fields is not supported for flattened cursors");
}
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_val_to_arrow(dst)
}
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_val_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val_to_arrow(&mut self, _dst: &mut ArrayBuilder) -> AnyResult<()> {
panic!("serialize_val_to_arrow is not supported for flattened cursors");
}
fn serialize_val_to_arrow_with_metadata(
&mut self,
_metadata: &dyn erased_serde::Serialize,
_dst: &mut ArrayBuilder,
) -> AnyResult<()> {
panic!("serialize_val_to_arrow_with_metadata is not supported for flattened cursors");
}
#[cfg(feature = "with-avro")]
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.val_to_avro(schema, refs)
}
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val_weight(dst)
}
fn serialize_val_weight(&mut self, _dst: &mut Vec<u8>) -> AnyResult<()> {
panic!("serialize_val_weight is not supported for flattened cursors");
}
fn serialize_val(&mut self, _dst: &mut Vec<u8>) -> AnyResult<()> {
panic!("serialize_val is not supported for flattened cursors");
}
fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
panic!("val_to_json is not supported for flattened cursors");
}
#[cfg(feature = "with-avro")]
fn val_to_avro(&mut self, _schema: &AvroSchema, _refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
panic!("val_to_avro is not supported for flattened cursors");
}
fn weight(&mut self) -> i64 {
self.cursor.weight()
}
fn step_key(&mut self) {
debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
self.cursor.step_val();
while !self.cursor.val_valid() && self.cursor.key_valid() {
self.cursor.step_key();
}
self.val_valid = true;
}
fn step_val(&mut self) {
self.val_valid = false;
}
fn rewind_keys(&mut self) {
self.cursor.rewind_keys();
self.val_valid = true;
}
fn rewind_vals(&mut self) {
self.val_valid = true;
}
fn seek_key_exact(&mut self, _key: &DynData) -> bool {
panic!("seek_key_exact is not supported for flattened cursors");
}
fn seek_key(&mut self, _key: &DynData) {
panic!("seek_key is not supported for flattened cursors");
}
}
pub trait SerCursor: Send {
fn key_valid(&self) -> bool;
fn val_valid(&self) -> bool;
fn key(&self) -> &DynData;
fn get_key(&self) -> Option<&DynData>;
fn val(&self) -> &DynData;
fn get_val(&self) -> Option<&DynData>;
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
fn key_to_json(&mut self) -> AnyResult<serde_json::Value>;
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()>;
fn serialize_val_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()>;
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()>;
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
fn serialize_val_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()>;
#[cfg(feature = "with-avro")]
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
fn val_to_json(&mut self) -> AnyResult<serde_json::Value>;
#[cfg(feature = "with-avro")]
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
fn weight(&mut self) -> i64;
fn step_key(&mut self);
fn step_val(&mut self);
fn rewind_keys(&mut self);
fn rewind_vals(&mut self);
fn count_keys(&mut self) -> usize {
let mut count = 0;
while self.key_valid() {
count += 1;
self.step_key()
}
count
}
fn seek_key_exact(&mut self, key: &DynData) -> bool;
fn seek_key(&mut self, key: &DynData);
}
pub trait SerBatchReaderHandle: Send + Sync + DynClone {
fn num_nonempty_mailboxes(&self) -> usize;
fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
fn concat(&self) -> Arc<dyn SerBatchReader>;
}
dyn_clone::clone_trait_object!(SerBatchReaderHandle);
pub struct CursorWithPolarity<'a> {
cursor: Box<dyn SerCursor + 'a>,
second_pass: bool,
}
impl<'a> CursorWithPolarity<'a> {
pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
let mut result = Self {
cursor,
second_pass: false,
};
if result.key_valid() {
result.advance_val();
}
result
}
fn advance_val(&mut self) {
while self.cursor.val_valid()
&& ((!self.second_pass && self.cursor.weight() >= 0)
|| (self.second_pass && self.cursor.weight() <= 0))
{
self.step_val();
}
}
}
impl SerCursor for CursorWithPolarity<'_> {
fn key_valid(&self) -> bool {
self.cursor.key_valid()
}
fn val_valid(&self) -> bool {
self.cursor.val_valid()
}
fn key(&self) -> &DynData {
self.cursor.key()
}
fn get_key(&self) -> Option<&DynData> {
self.cursor.get_key()
}
fn val(&self) -> &DynData {
self.cursor.val()
}
fn get_val(&self) -> Option<&DynData> {
self.cursor.get_val()
}
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key(dst)
}
fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.key_to_json()
}
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_key_fields(fields, dst)
}
fn serialize_val_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_val_fields(fields, dst)
}
#[cfg(feature = "with-avro")]
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.key_to_avro(schema, refs)
}
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key_weight(dst)
}
fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val_weight(dst)
}
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_key_to_arrow(dst)
}
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_key_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_val_to_arrow(dst)
}
fn serialize_val_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_val_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val(dst)
}
fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.val_to_json()
}
#[cfg(feature = "with-avro")]
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.val_to_avro(schema, refs)
}
fn weight(&mut self) -> i64 {
self.cursor.weight()
}
fn step_key(&mut self) {
self.cursor.step_key();
if !self.cursor.key_valid() && !self.second_pass {
self.cursor.rewind_keys();
self.second_pass = true;
}
if self.cursor.key_valid() {
self.advance_val();
}
}
fn step_val(&mut self) {
self.cursor.step_val();
self.advance_val();
}
fn rewind_keys(&mut self) {
self.cursor.rewind_keys();
self.second_pass = false;
if self.cursor.key_valid() {
self.advance_val();
}
}
fn rewind_vals(&mut self) {
self.cursor.rewind_vals();
self.advance_val();
}
fn seek_key_exact(&mut self, key: &DynData) -> bool {
self.cursor.seek_key_exact(key)
}
fn seek_key(&mut self, key: &DynData) {
self.cursor.seek_key(key);
}
}
pub trait CircuitCatalog: Send + Sync {
fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
fn output_iter(
&self,
) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
fn index_handles(
&self,
endpoint_name: &str,
stream: &SqlIdentifier,
index: &SqlIdentifier,
) -> Result<&OutputCollectionHandles, ControllerError>;
fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
}
#[doc(hidden)]
pub struct InputCollectionHandle {
pub schema: Relation,
pub handle: Box<dyn DeCollectionHandle>,
pub node_id: NodeId,
}
impl InputCollectionHandle {
#[doc(hidden)]
pub fn new<H>(schema: Relation, handle: H, node_id: NodeId) -> Self
where
H: DeCollectionHandle + 'static,
{
Self {
schema,
handle: Box::new(handle),
node_id,
}
}
}
#[derive(Clone)]
pub struct OutputCollectionHandles {
pub key_schema: Option<Relation>,
pub value_schema: Relation,
pub index_of: Option<SqlIdentifier>,
pub alias_as_index: Option<SqlIdentifier>,
pub integrate_handle: Option<Arc<dyn SerBatchReaderHandle>>,
pub delta_handle: Box<dyn SerBatchReaderHandle>,
pub enable_count: Arc<AtomicUsize>,
}
impl OutputCollectionHandles {
pub fn is_indexed(&self) -> bool {
self.key_schema.is_some()
}
}