use std::any::Any;
use std::borrow::Cow;
use std::fmt::{Display, Error as FmtError, Formatter};
use std::hash::Hasher;
use std::ops::{Add, AddAssign};
use std::sync::Arc;
use actix_web::HttpRequest;
use anyhow::Result as AnyResult;
use dbsp::operator::input::StagedBuffers;
use erased_serde::Serialize as ErasedSerialize;
use feldera_types::config::ConnectorConfig;
use feldera_types::program_schema::Relation;
use feldera_types::serde_with_context::FieldParseError;
use serde::Serialize;
use serde::de::StdError;
use crate::ConnectorMetadata;
use crate::catalog::{InputCollectionHandle, SerBatchReader};
use crate::errors::controller::ControllerError;
use crate::transport::Step;
pub trait InputFormat: Send + Sync {
fn name(&self) -> Cow<'static, str>;
fn config_from_http_request(
&self,
endpoint_name: &str,
request: &HttpRequest,
) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
fn new_parser(
&self,
endpoint_name: &str,
input_stream: &InputCollectionHandle,
config: &serde_json::Value,
) -> Result<Box<dyn Parser>, ControllerError>;
}
pub trait InputBuffer: Any + Send {
fn flush(&mut self);
fn len(&self) -> BufferSize;
fn hash(&self, hasher: &mut dyn Hasher);
fn is_empty(&self) -> bool {
self.len().is_empty()
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>>;
fn take_all(&mut self) -> Option<Box<dyn InputBuffer>> {
self.take_some(usize::MAX)
}
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct BufferSize {
pub records: usize,
pub bytes: usize,
}
impl BufferSize {
pub fn empty() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.records == 0 && self.bytes == 0
}
}
impl Add for BufferSize {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
BufferSize {
records: self.records + rhs.records,
bytes: self.bytes + rhs.bytes,
}
}
}
impl AddAssign for BufferSize {
fn add_assign(&mut self, rhs: Self) {
self.records += rhs.records;
self.bytes += rhs.bytes;
}
}
impl InputBuffer for Option<Box<dyn InputBuffer>> {
fn len(&self) -> BufferSize {
self.as_ref()
.map_or(BufferSize::empty(), |buffer| buffer.len())
}
fn hash(&self, hasher: &mut dyn Hasher) {
if let Some(buffer) = self {
buffer.hash(hasher)
}
}
fn flush(&mut self) {
if let Some(buffer) = self.as_mut() {
buffer.flush()
}
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
self.as_mut().and_then(|buffer| buffer.take_some(n))
}
}
impl InputBuffer for Box<dyn InputBuffer> {
fn len(&self) -> BufferSize {
self.as_ref().len()
}
fn hash(&self, hasher: &mut dyn Hasher) {
self.as_ref().hash(hasher)
}
fn flush(&mut self) {
self.as_mut().flush()
}
fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
self.as_mut().take_some(n)
}
}
pub struct StagedInputBuffer {
buffer: Box<dyn StagedBuffers>,
size: BufferSize,
}
impl StagedInputBuffer {
pub fn new(buffer: Box<dyn StagedBuffers>, size: BufferSize) -> Self {
Self { buffer, size }
}
}
impl InputBuffer for StagedInputBuffer {
fn flush(&mut self) {
self.buffer.flush()
}
fn len(&self) -> BufferSize {
self.size
}
fn hash(&self, _hasher: &mut dyn Hasher) {
unimplemented!()
}
fn take_some(&mut self, _n: usize) -> Option<Box<dyn InputBuffer>> {
unimplemented!()
}
}
pub trait Parser: Send + Sync {
fn parse(
&mut self,
data: &[u8],
metadata: Option<ConnectorMetadata>,
) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>);
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
fn splitter(&self) -> Box<dyn Splitter>;
fn fork(&self) -> Box<dyn Parser>;
}
pub trait Splitter: Send {
fn input(&mut self, data: &[u8]) -> Option<usize>;
fn clear(&mut self);
}
pub trait OutputFormat: Send + Sync {
fn name(&self) -> Cow<'static, str>;
fn config_from_http_request(
&self,
endpoint_name: &str,
request: &HttpRequest,
) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
fn new_encoder(
&self,
endpoint_name: &str,
config: &ConnectorConfig,
key_schema: &Option<Relation>,
value_schema: &Relation,
consumer: Box<dyn OutputConsumer>,
) -> Result<Box<dyn Encoder>, ControllerError>;
}
pub trait Encoder: Send {
fn consumer(&mut self) -> &mut dyn OutputConsumer;
fn encode(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()>;
}
pub trait OutputConsumer: Send {
fn max_buffer_size_bytes(&self) -> usize;
fn batch_start(&mut self, step: Step);
fn push_buffer(&mut self, buffer: &[u8], num_records: usize);
fn push_key(
&mut self,
key: Option<&[u8]>,
val: Option<&[u8]>,
headers: &[(&str, Option<&[u8]>)],
num_records: usize,
);
fn batch_end(&mut self);
fn memory(&self) -> usize {
0
}
}
pub const MAX_DUPLICATES: i64 = 1_000_000;
pub const MAX_RECORD_LEN_IN_ERRMSG: usize = 4096;
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct ParseError(Box<ParseErrorInner>);
impl Display for ParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
self.0.fmt(f)
}
}
impl StdError for ParseError {}
impl ParseError {
pub fn new(
description: String,
event_number: Option<u64>,
field: Option<String>,
invalid_text: Option<&str>,
invalid_bytes: Option<&[u8]>,
suggestion: Option<Cow<'static, str>>,
error_tag: Option<String>,
) -> Self {
Self(Box::new(ParseErrorInner::new(
description,
event_number,
field,
invalid_text,
invalid_bytes,
suggestion,
error_tag,
)))
}
pub fn text_event_error<E>(
msg: &str,
error: E,
event_number: u64,
invalid_text: Option<&str>,
suggestion: Option<Cow<'static, str>>,
) -> Self
where
E: ToString,
{
Self(Box::new(ParseErrorInner::text_event_error(
msg,
error,
event_number,
invalid_text,
suggestion,
)))
}
pub fn text_envelope_error(
description: String,
invalid_text: &str,
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self(Box::new(ParseErrorInner::text_envelope_error(
description,
invalid_text,
suggestion,
)))
}
pub fn bin_event_error(
description: String,
event_number: u64,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self(Box::new(ParseErrorInner::bin_event_error(
description,
event_number,
invalid_bytes,
suggestion,
)))
}
pub fn bin_envelope_error(
description: String,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self(Box::new(ParseErrorInner::bin_envelope_error(
description,
invalid_bytes,
suggestion,
)))
}
pub fn map_description<F>(self, f: F) -> Self
where
F: FnOnce(&str) -> String,
{
let mut inner = self.0;
let description = f(&inner.description);
inner.description = description;
Self(inner)
}
pub fn get_error_tag(&self) -> Option<String> {
self.0.get_error_tag()
}
}
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
pub struct ParseErrorInner {
description: String,
event_number: Option<u64>,
field: Option<String>,
invalid_bytes: Option<Vec<u8>>,
invalid_text: Option<String>,
suggestion: Option<Cow<'static, str>>,
tag: Option<String>,
}
impl Display for ParseErrorInner {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
let event = if let Some(event_number) = self.event_number {
format!(" (event #{})", event_number)
} else {
String::new()
};
let invalid_fragment = if let Some(invalid_bytes) = &self.invalid_bytes {
format!("\nInvalid bytes: {invalid_bytes:?}")
} else if let Some(invalid_text) = &self.invalid_text {
format!("\nInvalid fragment: '{invalid_text}'")
} else {
String::new()
};
let suggestion = if let Some(suggestion) = &self.suggestion {
format!("\n{suggestion}")
} else {
String::new()
};
write!(
f,
"Parse error{event}: {}{invalid_fragment}{suggestion}",
self.description
)
}
}
impl ParseErrorInner {
pub fn new(
description: String,
event_number: Option<u64>,
field: Option<String>,
invalid_text: Option<&str>,
invalid_bytes: Option<&[u8]>,
suggestion: Option<Cow<'static, str>>,
error_tag: Option<String>,
) -> Self {
Self {
description,
event_number,
field,
invalid_text: invalid_text.map(str::to_string),
invalid_bytes: invalid_bytes.map(ToOwned::to_owned),
suggestion,
tag: error_tag,
}
}
pub fn text_event_error<E>(
msg: &str,
error: E,
event_number: u64,
invalid_text: Option<&str>,
suggestion: Option<Cow<'static, str>>,
) -> Self
where
E: ToString,
{
let err_str = error.to_string();
let (descr, field) = if let Some(offset) = err_str.find("{\"field\":") {
if let Some(Ok(err)) = serde_json::Deserializer::from_str(&err_str[offset..])
.into_iter::<FieldParseError>()
.next()
{
(err.description, Some(err.field))
} else {
(err_str, None)
}
} else {
(err_str, None)
};
let column_name = if let Some(field) = &field {
format!(": error parsing field '{field}'")
} else {
String::new()
};
Self::new(
format!("{msg}{column_name}: {descr}",),
Some(event_number),
field,
invalid_text,
None,
suggestion,
Some("text_event_err".to_string()),
)
}
pub fn text_envelope_error(
description: String,
invalid_text: &str,
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self::new(
description,
None,
None,
Some(invalid_text),
None,
suggestion,
Some("text_envelope_err".to_string()),
)
}
pub fn bin_event_error(
description: String,
event_number: u64,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self::new(
description,
Some(event_number),
None,
None,
Some(invalid_bytes),
suggestion,
Some("bin_event_err".to_string()),
)
}
pub fn bin_envelope_error(
description: String,
invalid_bytes: &[u8],
suggestion: Option<Cow<'static, str>>,
) -> Self {
Self::new(
description,
None,
None,
None,
Some(invalid_bytes),
suggestion,
Some("bin_envelope_err".to_string()),
)
}
pub fn get_error_tag(&self) -> Option<String> {
self.tag.clone()
}
}