use std::sync::Arc;
use arrow_array::RecordBatch;
use laminar_connectors::checkpoint::SourceCheckpoint;
use laminar_connectors::config::ConnectorConfig;
use laminar_connectors::connector::SourceConnector;
use rustc_hash::FxHashMap;
pub struct SourceRegistration {
pub name: String,
pub connector: Box<dyn SourceConnector>,
pub config: ConnectorConfig,
pub supports_replay: bool,
pub restore_checkpoint: Option<SourceCheckpoint>,
}
#[async_trait::async_trait]
pub trait PipelineCallback: Send + 'static {
async fn execute_cycle(
&mut self,
source_batches: &FxHashMap<Arc<str>, Vec<RecordBatch>>,
watermark: i64,
) -> Result<FxHashMap<Arc<str>, Vec<RecordBatch>>, String>;
fn push_to_streams(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
fn update_mv_stores(&self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>) {
let _ = results;
}
async fn write_to_sinks(&mut self, results: &FxHashMap<Arc<str>, Vec<RecordBatch>>);
fn extract_watermark(&mut self, source_name: &str, batch: &RecordBatch);
fn filter_late_rows(&self, source_name: &str, batch: &RecordBatch) -> Option<RecordBatch>;
fn current_watermark(&self) -> i64;
async fn maybe_checkpoint(
&mut self,
force: bool,
source_offsets: FxHashMap<String, SourceCheckpoint>,
) -> bool;
async fn checkpoint_with_barrier(
&mut self,
source_checkpoints: FxHashMap<String, SourceCheckpoint>,
) -> bool;
fn record_cycle(&self, events_ingested: u64, batches: u64, elapsed_ns: u64);
async fn poll_tables(&mut self);
fn apply_control(&mut self, msg: super::ControlMsg);
fn is_backpressured(&self) -> bool {
false
}
fn has_deferred_input(&self) -> bool {
false
}
}