use anyhow::{Error as AnyError, Result as AnyResult};
use chrono::{DateTime, Utc};
use dyn_clone::DynClone;
use feldera_types::config::FtModel;
use feldera_types::program_schema::Relation;
use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
use serde::Deserialize;
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use std::fmt::Display;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::error::TryRecvError;
use xxhash_rust::xxh3::Xxh3Default;
use crate::PipelineState;
use crate::catalog::InputCollectionHandle;
use crate::format::{BufferSize, InputBuffer, ParseError, Parser};
use crate::metrics::ConnectorMetrics;
pub type Step = u64;
pub trait InputEndpoint: Send {
fn fault_tolerance(&self) -> Option<FtModel>;
}
pub trait TransportInputEndpoint: InputEndpoint {
fn open(
&self,
consumer: Box<dyn InputConsumer>,
parser: Box<dyn Parser>,
schema: Relation,
resume_info: Option<JsonValue>,
) -> AnyResult<Box<dyn InputReader>>;
}
#[doc(hidden)]
pub trait IntegratedInputEndpoint: InputEndpoint {
fn open(
self: Box<Self>,
input_handle: &InputCollectionHandle,
resume_info: Option<JsonValue>,
) -> AnyResult<Box<dyn InputReader>>;
}
#[derive(Debug)]
pub enum InputReaderCommand {
Replay { metadata: JsonValue, data: RmpValue },
Extend,
Pause,
Queue { checkpoint_requested: bool },
Disconnect,
}
impl InputReaderCommand {
pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand> {
match self {
InputReaderCommand::Replay { .. } => None,
InputReaderCommand::Queue { .. } => Some(NonFtInputReaderCommand::Queue),
InputReaderCommand::Extend => {
Some(NonFtInputReaderCommand::Transition(PipelineState::Running))
}
InputReaderCommand::Pause => {
Some(NonFtInputReaderCommand::Transition(PipelineState::Paused))
}
InputReaderCommand::Disconnect => Some(NonFtInputReaderCommand::Transition(
PipelineState::Terminated,
)),
}
}
}
#[derive(Debug)]
pub enum NonFtInputReaderCommand {
Queue,
Transition(PipelineState),
}
#[doc(hidden)]
pub struct InputQueueEntry<A, B> {
buffer: Option<B>,
timestamp: DateTime<Utc>,
start_transaction: Option<Option<String>>,
commit_transaction: bool,
aux: A,
}
impl<A, B> InputQueueEntry<A, B> {
#[doc(hidden)]
pub fn new_with_aux(timestamp: DateTime<Utc>, aux: A) -> Self {
Self {
buffer: None,
timestamp,
start_transaction: None,
commit_transaction: false,
aux,
}
}
#[doc(hidden)]
pub fn with_buffer(self, buffer: Option<B>) -> Self {
Self { buffer, ..self }
}
pub fn with_start_transaction(self, start_transaction: Option<Option<String>>) -> Self {
Self {
start_transaction,
..self
}
}
pub fn with_commit_transaction(self, commit_transaction: bool) -> Self {
Self {
commit_transaction,
..self
}
}
}
pub struct InputQueue<A = (), B = Box<dyn InputBuffer>> {
#[allow(clippy::type_complexity)]
pub queue: Mutex<VecDeque<InputQueueEntry<A, B>>>,
pub consumer: Box<dyn InputConsumer>,
pub transaction_in_progress: AtomicBool,
}
impl<A, B: InputBuffer> InputQueue<A, B> {
pub fn new(consumer: Box<dyn InputConsumer>) -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
consumer,
transaction_in_progress: AtomicBool::new(false),
}
}
pub fn push_entry(&self, entry: InputQueueEntry<A, B>, errors: Vec<ParseError>) {
self.consumer.parse_errors(errors);
let len = entry
.buffer
.as_ref()
.map_or(BufferSize::empty(), |buffer| buffer.len());
let mut queue = self.queue.lock().unwrap();
queue.push_back(entry);
self.consumer.buffered(len);
if len.records == 0 {
self.consumer.request_step();
}
}
pub fn push_with_aux(
&self,
(buffer, errors): (Option<B>, Vec<ParseError>),
timestamp: DateTime<Utc>,
aux: A,
) {
let entry = InputQueueEntry::new_with_aux(timestamp, aux).with_buffer(buffer);
self.push_entry(entry, errors);
}
#[allow(clippy::type_complexity)]
pub fn flush_with_aux(&self) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
self.flush_with_aux_until(&|_| false)
}
#[allow(clippy::type_complexity)]
pub fn flush_with_aux_until(
&self,
stop_at: &dyn Fn(&A) -> bool,
) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
let mut total = BufferSize::empty();
let mut hasher = self.consumer.hasher();
let n = self.consumer.max_batch_size();
let mut consumed_aux = Vec::new();
let mut stop = false;
while !stop && total.records < n {
let Some(InputQueueEntry {
buffer,
timestamp,
aux,
start_transaction,
commit_transaction,
}) = self.queue.lock().unwrap().pop_front()
else {
break;
};
if let Some(label) = start_transaction {
self.start_transaction(label.as_deref());
}
if let Some(mut buffer) = buffer {
total += buffer.len();
if let Some(hasher) = hasher.as_mut() {
buffer.hash(hasher);
}
buffer.flush();
}
stop = stop_at(&aux);
consumed_aux.push((timestamp, aux));
if commit_transaction && self.commit_transaction() {
break;
}
}
let mut queue = self.queue.lock().unwrap();
while !stop
&& queue
.front()
.is_some_and(|InputQueueEntry { buffer, .. }| buffer.is_none())
{
let Some(InputQueueEntry {
timestamp,
aux,
start_transaction,
commit_transaction,
..
}) = queue.pop_front()
else {
break;
};
if let Some(label) = start_transaction {
self.start_transaction(label.as_deref());
}
stop = stop_at(&aux);
consumed_aux.push((timestamp, aux));
if commit_transaction && self.commit_transaction() {
break;
}
}
(total, hasher, consumed_aux)
}
pub fn len(&self) -> usize {
self.queue.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn start_transaction(&self, label: Option<&str>) -> bool {
if self
.transaction_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.consumer.start_transaction(label);
true
} else {
false
}
}
fn commit_transaction(&self) -> bool {
if self
.transaction_in_progress
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.consumer.commit_transaction();
true
} else {
false
}
}
}
impl InputQueue<(), Box<dyn InputBuffer>> {
pub fn push(
&self,
(buffer, errors): (Option<Box<dyn InputBuffer>>, Vec<ParseError>),
timestamp: DateTime<Utc>,
) {
self.push_with_aux((buffer, errors), timestamp, ())
}
pub fn queue(&self) {
let mut total = BufferSize::empty();
let n = self.consumer.max_batch_size();
let mut consumed = Vec::new();
while total.records < n {
let Some(InputQueueEntry {
buffer,
timestamp,
start_transaction,
commit_transaction,
..
}) = self.queue.lock().unwrap().pop_front()
else {
break;
};
if let Some(label) = start_transaction
&& self
.transaction_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.consumer.start_transaction(label.as_deref());
}
if let Some(mut buffer) = buffer {
let mut taken = buffer.take_some(n - total.records);
total += taken.len();
consumed.push(Watermark::new(timestamp, None));
taken.flush();
drop(taken);
if !buffer.is_empty() {
self.queue.lock().unwrap().push_front(InputQueueEntry {
buffer: Some(buffer),
timestamp,
start_transaction: None,
commit_transaction,
aux: (),
});
break;
}
}
if commit_transaction {
if self
.transaction_in_progress
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.consumer.commit_transaction();
}
break;
}
}
self.consumer.extended(total, None, consumed);
}
}
pub trait InputReader: Send + Sync {
fn request(&self, command: InputReaderCommand);
fn is_closed(&self) -> bool;
fn replay(&self, metadata: JsonValue, data: RmpValue) {
self.request(InputReaderCommand::Replay { metadata, data });
}
fn extend(&self) {
self.request(InputReaderCommand::Extend);
}
fn pause(&self) {
self.request(InputReaderCommand::Pause);
}
fn queue(&self, checkpoint_requested: bool) {
self.request(InputReaderCommand::Queue {
checkpoint_requested,
});
}
fn disconnect(&self) {
self.request(InputReaderCommand::Disconnect);
}
fn memory(&self) -> usize {
0
}
}
#[derive(Clone, Debug)]
pub struct Watermark {
pub timestamp: DateTime<Utc>,
pub metadata: Option<JsonValue>,
}
impl Watermark {
pub fn new(timestamp: DateTime<Utc>, metadata: Option<JsonValue>) -> Self {
Self {
timestamp,
metadata,
}
}
}
pub trait InputConsumer: Send + Sync + DynClone {
fn max_batch_size(&self) -> usize;
fn pipeline_fault_tolerance(&self) -> Option<FtModel>;
fn hasher(&self) -> Option<Xxh3Default> {
match self.pipeline_fault_tolerance() {
Some(FtModel::ExactlyOnce) => Some(Xxh3Default::new()),
_ => None,
}
}
fn parse_errors(&self, errors: Vec<ParseError>);
fn buffered(&self, amt: BufferSize);
fn replayed(&self, amt: BufferSize, hash: u64);
fn extended(&self, amt: BufferSize, resume: Option<Resume>, watermarks: Vec<Watermark>);
fn eoi(&self);
fn request_step(&self);
fn start_transaction(&self, label: Option<&str>);
fn commit_transaction(&self);
fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>);
}
#[derive(Clone, Debug)]
pub enum Resume {
Barrier,
Seek {
seek: JsonValue,
},
Replay {
seek: JsonValue,
replay: RmpValue,
hash: u64,
},
}
impl Resume {
pub fn is_barrier(&self) -> bool {
matches!(self, Self::Barrier)
}
pub fn seek(&self) -> Option<&JsonValue> {
match self {
Resume::Barrier => None,
Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
}
}
pub fn into_seek(self) -> Option<JsonValue> {
match self {
Resume::Barrier => None,
Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
}
}
pub fn fault_tolerance(&self) -> FtModel {
match self {
&Resume::Barrier | Resume::Seek { .. } => FtModel::AtLeastOnce,
Resume::Replay { .. } => FtModel::ExactlyOnce,
}
}
pub fn new_metadata_only(seek: JsonValue, hash: Option<u64>) -> Self {
match hash {
Some(hash) => Self::Replay {
seek,
replay: RmpValue::Nil,
hash,
},
None => Self::Seek { seek },
}
}
pub fn new_data_only<F>(replay: F, hash: Option<u64>) -> Self
where
F: FnOnce() -> RmpValue,
{
let seek = JsonValue::Null;
match hash {
Some(hash) => Self::Replay {
seek,
replay: replay(),
hash,
},
None => Self::Seek { seek },
}
}
}
dyn_clone::clone_trait_object!(InputConsumer);
pub fn parse_resume_info<M>(metadata: &JsonValue) -> AnyResult<M>
where
M: DeserializeOwned,
{
serde_json_path_to_error::from_value::<M>(metadata.clone())
.map_err(|e| anyhow::anyhow!("unable to parse checkpointed connector state (checkpointed state: {metadata}; parse error: {e})"))
}
#[doc(hidden)]
pub type AsyncErrorCallback = Box<dyn Fn(bool, AnyError, Option<&'static str>) + Send + Sync>;
pub trait OutputEndpoint: Send {
fn connect(&mut self, async_error_callback: AsyncErrorCallback) -> AnyResult<()>;
fn max_buffer_size_bytes(&self) -> usize;
fn batch_start(&mut self, _step: Step) -> AnyResult<()> {
Ok(())
}
fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()>;
fn push_key(
&mut self,
key: Option<&[u8]>,
val: Option<&[u8]>,
headers: &[(&str, Option<&[u8]>)],
) -> AnyResult<()>;
fn batch_end(&mut self) -> AnyResult<()> {
Ok(())
}
fn is_fault_tolerant(&self) -> bool;
fn memory(&self) -> usize {
0
}
}
pub struct InputCommandReceiver<M, D> {
receiver: UnboundedReceiver<InputReaderCommand>,
buffer: Option<InputReaderCommand>,
_phantom: PhantomData<(M, D)>,
}
#[derive(Debug)]
pub enum InputCommandReceiverError {
Disconnected,
JsonDecodeError(serde_json_path_to_error::Error),
RmpDecodeError(RmpDecodeError),
}
impl std::error::Error for InputCommandReceiverError {}
impl Display for InputCommandReceiverError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InputCommandReceiverError::Disconnected => write!(f, "sender disconnected"),
InputCommandReceiverError::RmpDecodeError(e) => e.fmt(f),
InputCommandReceiverError::JsonDecodeError(e) => e.fmt(f),
}
}
}
impl From<RmpDecodeError> for InputCommandReceiverError {
fn from(value: RmpDecodeError) -> Self {
Self::RmpDecodeError(value)
}
}
impl From<serde_json_path_to_error::Error> for InputCommandReceiverError {
fn from(value: serde_json_path_to_error::Error) -> Self {
Self::JsonDecodeError(value)
}
}
impl<M, D> InputCommandReceiver<M, D> {
pub fn new(receiver: UnboundedReceiver<InputReaderCommand>) -> Self {
Self {
receiver,
buffer: None,
_phantom: PhantomData,
}
}
#[doc(hidden)]
pub fn blocking_recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
where
M: for<'a> Deserialize<'a>,
D: for<'a> Deserialize<'a>,
{
let command = self.blocking_recv()?;
self.take_replay(command)
}
#[doc(hidden)]
pub async fn recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
where
M: for<'a> Deserialize<'a>,
D: for<'a> Deserialize<'a>,
{
let command = self.recv().await?;
self.take_replay(command)
}
fn take_replay(
&mut self,
command: InputReaderCommand,
) -> Result<Option<(M, D)>, InputCommandReceiverError>
where
M: for<'a> Deserialize<'a>,
D: for<'a> Deserialize<'a>,
{
match command {
InputReaderCommand::Replay { metadata, data } => Ok(Some((
serde_json_path_to_error::from_value::<M>(metadata)?,
rmpv::ext::from_value::<D>(data)?,
))),
other => {
self.put_back(other);
Ok(None)
}
}
}
#[doc(hidden)]
pub async fn recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
match self.buffer.take() {
Some(value) => Ok(value),
None => self
.receiver
.recv()
.await
.ok_or(InputCommandReceiverError::Disconnected),
}
}
#[doc(hidden)]
pub fn blocking_recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
match self.buffer.take() {
Some(value) => Ok(value),
None => self
.receiver
.blocking_recv()
.ok_or(InputCommandReceiverError::Disconnected),
}
}
#[doc(hidden)]
pub fn try_recv(&mut self) -> Result<Option<InputReaderCommand>, InputCommandReceiverError> {
if let Some(command) = self.buffer.take() {
Ok(Some(command))
} else {
match self.receiver.try_recv() {
Ok(command) => Ok(Some(command)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(InputCommandReceiverError::Disconnected),
}
}
}
#[doc(hidden)]
pub fn put_back(&mut self, value: InputReaderCommand) {
assert!(self.buffer.is_none());
self.buffer = Some(value);
}
}