use std::any::Any;
use std::borrow::Cow;
use std::cmp::max;
use std::fmt::{Display, Error as FmtError, Formatter};
use std::fs::File;
use std::hash::Hasher;
use std::io::{Error as IoError, Read};
use std::ops::{Add, AddAssign, Range};
use std::sync::Arc;
use actix_web::HttpRequest;
use anyhow::Result as AnyResult;
use dbsp::operator::input::StagedBuffers;
use erased_serde::Serialize as ErasedSerialize;
use feldera_types::config::ConnectorConfig;
use feldera_types::program_schema::Relation;
use feldera_types::serde_with_context::FieldParseError;
use serde::Serialize;
use serde::de::StdError;
use crate::ConnectorMetadata;
use crate::catalog::{InputCollectionHandle, SerBatchReader};
use crate::errors::controller::ControllerError;
use crate::postprocess::Postprocessor;
use crate::preprocess::Preprocessor;
use crate::transport::{OutputBatchType, Step};
pub trait InputFormat: Send + Sync {
fn name(&self) -> Cow<'static, str>;
fn config_from_http_request(
&self,
endpoint_name: &str,
request: &HttpRequest,
) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
fn new_parser(
&self,
endpoint_name: &str,
input_stream: &InputCollectionHandle,
config: &serde_json::Value,
) -> Result<Box<dyn Parser>, ControllerError>;
}
pub trait InputBuffer: Any + Send {
fn flush(&mut self);
fn len(&self) -> BufferSize;
fn hash(&self, hasher: &mut dyn Hasher);
fn is_empty(&self) -> bool {
self.len().is_empty()
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>>;
fn take_all(&mut self) -> Option<Box<dyn InputBuffer>> {
self.take_some(usize::MAX)
}
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct BufferSize {
pub records: usize,
pub bytes: usize,
}
impl BufferSize {
pub fn empty() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.records == 0 && self.bytes == 0
}
}
impl Add for BufferSize {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
BufferSize {
records: self.records + rhs.records,
bytes: self.bytes + rhs.bytes,
}
}
}
impl AddAssign for BufferSize {
fn add_assign(&mut self, rhs: Self) {
self.records += rhs.records;
self.bytes += rhs.bytes;
}
}
impl InputBuffer for Option<Box<dyn InputBuffer>> {
fn len(&self) -> BufferSize {
self.as_ref()
.map_or(BufferSize::empty(), |buffer| buffer.len())
}
fn hash(&self, hasher: &mut dyn Hasher) {
if let Some(buffer) = self {
buffer.hash(hasher)
}
}
fn flush(&mut self) {
if let Some(buffer) = self.as_mut() {
buffer.flush()
}
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
self.as_mut().and_then(|buffer| buffer.take_some(n))
}
}
impl InputBuffer for Box<dyn InputBuffer> {
fn len(&self) -> BufferSize {
self.as_ref().len()
}
fn hash(&self, hasher: &mut dyn Hasher) {
self.as_ref().hash(hasher)
}
fn flush(&mut self) {
self.as_mut().flush()
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
self.as_mut().take_some(n)
}
}
impl InputBuffer for Vec<Box<dyn InputBuffer>> {
fn flush(&mut self) {
for v in self.iter_mut() {
v.flush();
}
}
fn len(&self) -> BufferSize {
let mut size = BufferSize::empty();
for v in self.iter() {
size += v.len();
}
size
}
fn hash(&self, hasher: &mut dyn Hasher) {
for v in self.iter() {
v.hash(hasher);
}
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
let mut result = Vec::new();
let mut remaining = n;
let mut index = 0;
for v in self.iter_mut() {
if remaining == 0 {
break;
}
let buf = v.take_some(remaining);
if let Some(ib) = buf {
let len = ib.len().records;
if remaining >= len {
index += 1;
}
remaining = remaining.saturating_sub(len);
result.push(ib);
}
}
self.drain(0..index);
if result.is_empty() {
None
} else {
Some(Box::new(result))
}
}
}
pub fn flatten_nested<T>(buffers: Vec<Box<dyn InputBuffer>>) -> Vec<Box<T>>
where
T: Any,
{
fn inner<T>(input: Vec<Box<dyn InputBuffer>>, output: &mut Vec<Box<T>>)
where
T: Any,
{
for buffer in input {
let any = buffer as Box<dyn Any>;
match any.downcast::<Vec<Box<dyn InputBuffer>>>() {
Ok(vec) => inner(*vec, output),
Err(any) => output.push(any.downcast().unwrap()),
}
}
}
let mut output = Vec::new();
inner(buffers, &mut output);
output
}
pub struct StagedInputBuffer {
buffer: Box<dyn StagedBuffers>,
size: BufferSize,
}
impl StagedInputBuffer {
pub fn new(buffer: Box<dyn StagedBuffers>, size: BufferSize) -> Self {
Self { buffer, size }
}
}
impl InputBuffer for StagedInputBuffer {
fn flush(&mut self) {
self.buffer.flush()
}
fn len(&self) -> BufferSize {
self.size
}
fn hash(&self, _hasher: &mut dyn Hasher) {
unimplemented!()
}
fn take_some(&mut self, _n: usize) -> Option<Box<dyn InputBuffer>> {
unimplemented!()
}
}
pub trait Parser: Send + Sync {
fn parse(
&mut self,
data: &[u8],
metadata: Option<ConnectorMetadata>,
) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>);
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
fn splitter(&self) -> Box<dyn Splitter>;
fn fork(&self) -> Box<dyn Parser>;
}
pub struct StreamingPreprocessedParser {
preprocessor: Box<dyn Preprocessor>,
stream_splitter: StreamSplitter,
parser: Box<dyn Parser>,
}
impl StreamingPreprocessedParser {
pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
Self {
preprocessor,
stream_splitter: StreamSplitter::new(parser.splitter()),
parser,
}
}
}
impl Parser for StreamingPreprocessedParser {
fn parse(
&mut self,
data: &[u8],
metadata: Option<ConnectorMetadata>,
) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
let (pre_data, mut pre_errors) = self.preprocessor.process(data);
self.stream_splitter.append(&pre_data);
let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
while let Some(chunk) = self.stream_splitter.next(true) {
let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
pre_errors.append(&mut parse_errors);
if let Some(data) = parsed_data {
parsed.push(data);
}
}
if parsed.is_empty() {
(None, pre_errors)
} else {
(Some(Box::new(parsed)), pre_errors)
}
}
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
self.parser.stage(buffers)
}
fn splitter(&self) -> Box<dyn Splitter> {
let pre_splitter = self.preprocessor.splitter();
if let Some(splitter) = pre_splitter {
return splitter;
}
self.parser.splitter()
}
fn fork(&self) -> Box<dyn Parser> {
Box::new(StreamingPreprocessedParser::new(
self.preprocessor.fork(),
self.parser.fork(),
))
}
}
pub struct MessageOrientedPreprocessedParser {
preprocessor: Box<dyn Preprocessor>,
parser: Box<dyn Parser>,
}
impl MessageOrientedPreprocessedParser {
pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
Self {
preprocessor,
parser,
}
}
}
impl Parser for MessageOrientedPreprocessedParser {
fn parse(
&mut self,
data: &[u8],
metadata: Option<ConnectorMetadata>,
) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
let (pre_data, mut pre_errors) = self.preprocessor.process(data);
let mut parser_splitter = self.parser.splitter();
let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
let mut remaining = pre_data.as_slice();
while !remaining.is_empty() {
let chunk;
let split_offset = parser_splitter.input(remaining).unwrap_or(remaining.len());
(chunk, remaining) = remaining.split_at(split_offset);
let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
pre_errors.append(&mut parse_errors);
if let Some(data) = parsed_data {
parsed.push(data);
}
}
if parsed.is_empty() {
(None, pre_errors)
} else {
(Some(Box::new(parsed)), pre_errors)
}
}
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
self.parser.stage(buffers)
}
fn splitter(&self) -> Box<dyn Splitter> {
let pre_splitter = self.preprocessor.splitter();
if let Some(splitter) = pre_splitter {
return splitter;
}
self.parser.splitter()
}
fn fork(&self) -> Box<dyn Parser> {
Box::new(MessageOrientedPreprocessedParser::new(
self.preprocessor.fork(),
self.parser.fork(),
))
}
}
pub trait Splitter: Send + Sync {
fn input(&mut self, data: &[u8]) -> Option<usize>;
fn clear(&mut self);
}
pub struct StreamSplitter {
buffer: Vec<u8>,
start: u64,
fragment: Range<usize>,
fed: usize,
splitter: Box<dyn Splitter>,
}
impl StreamSplitter {
pub fn new(splitter: Box<dyn Splitter>) -> Self {
Self {
buffer: Vec::new(),
start: 0,
fragment: 0..0,
fed: 0,
splitter,
}
}
pub fn next(&mut self, eoi: bool) -> Option<&[u8]> {
match self
.splitter
.input(&self.buffer[self.fed..self.fragment.end])
{
Some(n) => {
let chunk = &self.buffer[self.fragment.start..self.fed + n];
self.fed += n;
self.fragment.start = self.fed;
Some(chunk)
}
None => {
self.fed = self.fragment.end;
if eoi && !self.fragment.is_empty() {
let chunk = &self.buffer[self.fragment.clone()];
self.fragment.start = self.fragment.end;
Some(chunk)
} else {
None
}
}
}
}
pub fn append(&mut self, data: &[u8]) {
let final_len = self.fragment.len() + data.len();
if final_len > self.buffer.len() {
self.buffer.reserve(final_len - self.buffer.len());
}
self.buffer.copy_within(self.fragment.clone(), 0);
self.buffer.resize(self.fragment.len(), 0);
self.buffer.extend(data);
self.fed -= self.fragment.start;
self.start += self.fragment.start as u64;
self.fragment = 0..self.buffer.len();
}
pub fn read(
&mut self,
file: &mut File,
buffer_size: usize,
limit: usize,
) -> Result<usize, IoError> {
if self.fragment.start != 0 {
self.buffer.copy_within(self.fragment.clone(), 0);
self.fed -= self.fragment.start;
self.start += self.fragment.start as u64;
self.fragment = 0..self.fragment.len();
}
if self.fragment.len() == self.buffer.len() {
self.buffer
.resize(max(buffer_size, self.buffer.capacity() * 2), 0);
}
let mut space = &mut self.buffer[self.fragment.len()..];
if space.len() > limit {
space = &mut space[..limit];
}
let result = file.read(space);
if let Ok(n) = result {
self.fragment.end += n;
}
result
}
pub fn position(&self) -> u64 {
self.start + self.fragment.start as u64
}
pub fn seek(&mut self, offset: u64) {
self.start = offset;
self.fragment = 0..0;
self.fed = 0;
self.splitter.clear();
}
pub fn reset(&mut self) {
self.seek(0);
}
}
pub trait OutputFormat: Send + Sync {
fn name(&self) -> Cow<'static, str>;
fn config_from_http_request(
&self,
endpoint_name: &str,
request: &HttpRequest,
) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
fn new_encoder(
&self,
endpoint_name: &str,
config: &ConnectorConfig,
key_schema: &Option<Relation>,
value_schema: &Relation,
consumer: Box<dyn OutputConsumer>,
is_index: bool,
) -> Result<Box<dyn Encoder>, ControllerError>;
}
pub trait Encoder: Send {
fn consumer(&mut self) -> &mut dyn OutputConsumer;
fn encode(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()>;
}
#[doc(hidden)]
pub trait OutputConsumer: Send {
fn max_buffer_size_bytes(&self) -> usize;
fn batch_start(&mut self, step: Step, batch_type: OutputBatchType);
fn push_buffer(&mut self, buffer: &[u8], num_records: usize);
fn push_key(
&mut self,
key: Option<&[u8]>,
val: Option<&[u8]>,
headers: &[(&str, Option<&[u8]>)],
num_records: usize,
);
fn batch_end(&mut self);
fn memory(&self) -> usize {
0
}
}
pub type PostprocessorErrorCallback = Box<dyn Fn(anyhow::Error) + Send + Sync>;
pub struct PostprocessedConsumer {
inner: Box<dyn OutputConsumer>,
postprocessor: Box<dyn Postprocessor>,
error_cb: PostprocessorErrorCallback,
}
impl PostprocessedConsumer {
pub fn new(
inner: Box<dyn OutputConsumer>,
postprocessor: Box<dyn Postprocessor>,
error_cb: PostprocessorErrorCallback,
) -> Self {
Self {
inner,
postprocessor,
error_cb,
}
}
}
impl OutputConsumer for PostprocessedConsumer {
fn max_buffer_size_bytes(&self) -> usize {
self.inner.max_buffer_size_bytes()
}
fn batch_start(&mut self, step: Step, batch_type: OutputBatchType) {
self.postprocessor.batch_start(step, batch_type);
self.inner.batch_start(step, batch_type);
}
fn push_buffer(&mut self, buffer: &[u8], num_records: usize) {
match self.postprocessor.push_buffer(buffer) {
Ok(transformed) => self.inner.push_buffer(&transformed, num_records),
Err(e) => (self.error_cb)(e),
}
}
fn push_key(
&mut self,
key: Option<&[u8]>,
val: Option<&[u8]>,
headers: &[(&str, Option<&[u8]>)],
num_records: usize,
) {
match self.postprocessor.push_key(key, val, headers) {
Ok((k, v, h)) => {
let h_refs: Vec<(&str, Option<&[u8]>)> =
h.iter().map(|(k, v)| (k.as_str(), v.as_deref())).collect();
self.inner
.push_key(k.as_deref(), v.as_deref(), &h_refs, num_records);
}
Err(e) => (self.error_cb)(e),
}
}
fn batch_end(&mut self) {
self.postprocessor.batch_end();
self.inner.batch_end();
}
fn memory(&self) -> usize {
self.inner.memory() + self.postprocessor.memory()
}
}
pub const MAX_DUPLICATES: i64 = 1_000_000;
pub const MAX_RECORD_LEN_IN_ERRMSG: usize = 4096;
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct ParseError(Box<ParseErrorInner>);
impl Display for ParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
self.0.fmt(f)
}
}
impl StdError for ParseError {}
impl ParseError {
pub fn new(
description: String,
event_number: Option<u64>,
field: Option<String>,
invalid_text: Option<&str>,
invalid_bytes: Option<&[u8]>,
suggestion: Option<Cow<'static, str>>,
error_tag: Option<String>,
) -> Self {
Self(Box::new(ParseErrorInner::new(
description,
event_number,
field,
invalid_text,
invalid_bytes,
suggestion,
error_tag,
)))
}
pub fn text_event_error<E>(
msg: &str,
error: E,
event_number: u64,
invalid_text: Option<&str>,
suggestion: Option<Cow<'static, str>>,
) -> Self
where
E: ToString,
{
Self(Box::new(ParseErrorInner::text_event_error(
msg,
error,
event_number,
invalid_text,
suggestion,
)))
}
pub fn text_envelope_error(
description: String,
invalid_text: &str,
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self(Box::new(ParseErrorInner::text_envelope_error(
description,
invalid_text,
suggestion,
)))
}
pub fn bin_event_error(
description: String,
event_number: u64,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self(Box::new(ParseErrorInner::bin_event_error(
description,
event_number,
invalid_bytes,
suggestion,
)))
}
pub fn bin_envelope_error(
description: String,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self(Box::new(ParseErrorInner::bin_envelope_error(
description,
invalid_bytes,
suggestion,
)))
}
pub fn map_description<F>(self, f: F) -> Self
where
F: FnOnce(&str) -> String,
{
let mut inner = self.0;
let description = f(&inner.description);
inner.description = description;
Self(inner)
}
pub fn get_error_tag(&self) -> Option<String> {
self.0.get_error_tag()
}
}
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
pub struct ParseErrorInner {
description: String,
event_number: Option<u64>,
field: Option<String>,
invalid_bytes: Option<Vec<u8>>,
invalid_text: Option<String>,
suggestion: Option<Cow<'static, str>>,
tag: Option<String>,
}
impl Display for ParseErrorInner {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
let event = if let Some(event_number) = self.event_number {
format!(" (event #{})", event_number)
} else {
String::new()
};
let invalid_fragment = if let Some(invalid_bytes) = &self.invalid_bytes {
format!("\nInvalid bytes: {invalid_bytes:?}")
} else if let Some(invalid_text) = &self.invalid_text {
format!("\nInvalid fragment: '{invalid_text}'")
} else {
String::new()
};
let suggestion = if let Some(suggestion) = &self.suggestion {
format!("\n{suggestion}")
} else {
String::new()
};
write!(
f,
"Parse error{event}: {}{invalid_fragment}{suggestion}",
self.description
)
}
}
impl ParseErrorInner {
pub fn new(
description: String,
event_number: Option<u64>,
field: Option<String>,
invalid_text: Option<&str>,
invalid_bytes: Option<&[u8]>,
suggestion: Option<Cow<'static, str>>,
error_tag: Option<String>,
) -> Self {
Self {
description,
event_number,
field,
invalid_text: invalid_text.map(str::to_string),
invalid_bytes: invalid_bytes.map(ToOwned::to_owned),
suggestion,
tag: error_tag,
}
}
pub fn text_event_error<E>(
msg: &str,
error: E,
event_number: u64,
invalid_text: Option<&str>,
suggestion: Option<Cow<'static, str>>,
) -> Self
where
E: ToString,
{
let err_str = error.to_string();
let (descr, field) = if let Some(offset) = err_str.find("{\"field\":") {
if let Some(Ok(err)) = serde_json::Deserializer::from_str(&err_str[offset..])
.into_iter::<FieldParseError>()
.next()
{
(err.description, Some(err.field))
} else {
(err_str, None)
}
} else {
(err_str, None)
};
let column_name = if let Some(field) = &field {
format!(": error parsing field '{field}'")
} else {
String::new()
};
Self::new(
format!("{msg}{column_name}: {descr}",),
Some(event_number),
field,
invalid_text,
None,
suggestion,
Some("text_event_err".to_string()),
)
}
pub fn text_envelope_error(
description: String,
invalid_text: &str,
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self::new(
description,
None,
None,
Some(invalid_text),
None,
suggestion,
Some("text_envelope_err".to_string()),
)
}
pub fn bin_event_error(
description: String,
event_number: u64,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self::new(
description,
Some(event_number),
None,
None,
Some(invalid_bytes),
suggestion,
Some("bin_event_err".to_string()),
)
}
pub fn bin_envelope_error(
description: String,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self::new(
description,
None,
None,
None,
Some(invalid_bytes),
suggestion,
Some("bin_envelope_err".to_string()),
)
}
pub fn get_error_tag(&self) -> Option<String> {
self.tag.clone()
}
}