use crate::{
integrations::{
map_integration_result, run_with_contract, AdapterCallContract, BigQueryAdapter,
ClickHouseAdapter, DataWindowRequest, OpenSearchAdapter, QueryContext, RetryPolicy,
SearchRequest, SingleStoreAdapter, SqlCommand,
},
DataError, DataResult, Row, WireFormatProfile,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fs::{self, OpenOptions},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
thread,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PipelineTrigger {
Manual,
Scheduled,
EventDriven,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Pipeline {
pub id: String,
pub name: String,
pub source: String,
pub transform: String,
pub sink: String,
pub trigger: PipelineTrigger,
pub batch_size: usize,
pub max_batches_per_run: Option<usize>,
pub deadline_ms: u64,
pub retry_policy: RetryPolicy,
pub metadata: BTreeMap<String, String>,
}
impl Pipeline {
pub fn new(
id: impl Into<String>,
name: impl Into<String>,
source: impl Into<String>,
transform: impl Into<String>,
sink: impl Into<String>,
) -> Self {
Self {
id: id.into(),
name: name.into(),
source: source.into(),
transform: transform.into(),
sink: sink.into(),
trigger: PipelineTrigger::Manual,
batch_size: 500,
max_batches_per_run: None,
deadline_ms: 30_000,
retry_policy: RetryPolicy::conservative(),
metadata: BTreeMap::new(),
}
}
pub fn with_trigger(mut self, trigger: PipelineTrigger) -> Self {
self.trigger = trigger;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size.max(1);
self
}
pub fn with_max_batches_per_run(mut self, max_batches_per_run: Option<usize>) -> Self {
self.max_batches_per_run = max_batches_per_run.map(|value| value.max(1));
self
}
pub fn with_deadline_ms(mut self, deadline_ms: u64) -> Self {
self.deadline_ms = deadline_ms.max(1);
self
}
pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = RetryPolicy {
max_attempts: retry_policy.max_attempts.max(1),
initial_backoff_ms: retry_policy.initial_backoff_ms,
max_backoff_ms: retry_policy
.max_backoff_ms
.max(retry_policy.initial_backoff_ms),
};
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum PipelineRunCommand {
Manual { requested_by: Option<String> },
Scheduled { schedule_id: String },
Event { event: String, payload: Value },
}
impl PipelineRunCommand {
fn as_label(&self) -> &'static str {
match self {
Self::Manual { .. } => "manual",
Self::Scheduled { .. } => "scheduled",
Self::Event { .. } => "event",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PipelineRunStatus {
Queued,
Running,
Paused,
Succeeded,
Failed,
Canceled,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PipelineStage {
Source,
Transform,
Sink,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineRun {
pub id: String,
pub pipeline_id: String,
pub command: PipelineRunCommand,
pub status: PipelineRunStatus,
pub tenant_id: Option<String>,
pub trace_id: Option<String>,
pub started_at_unix_ms: u64,
pub finished_at_unix_ms: Option<u64>,
pub resumed_from_checkpoint: Option<String>,
pub last_checkpoint: Option<String>,
pub rows_read: u64,
pub rows_transformed: u64,
pub rows_written: u64,
pub rows_dead_letter: u64,
pub duplicate_writes: u64,
pub retries: u64,
pub latest_lineage: Option<String>,
pub latest_freshness_lag_ms: Option<u64>,
pub errors: Vec<String>,
}
impl PipelineRun {
fn new(
id: String,
pipeline_id: String,
command: PipelineRunCommand,
context: &QueryContext,
) -> Self {
Self {
id,
pipeline_id,
command,
status: PipelineRunStatus::Queued,
tenant_id: context.tenant_id.clone(),
trace_id: context.trace_id.clone(),
started_at_unix_ms: now_unix_ms(),
finished_at_unix_ms: None,
resumed_from_checkpoint: None,
last_checkpoint: None,
rows_read: 0,
rows_transformed: 0,
rows_written: 0,
rows_dead_letter: 0,
duplicate_writes: 0,
retries: 0,
latest_lineage: None,
latest_freshness_lag_ms: None,
errors: Vec::new(),
}
}
fn finish(&mut self, status: PipelineRunStatus) {
self.status = status;
self.finished_at_unix_ms = Some(now_unix_ms());
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PipelineEventKind {
RunQueued,
RunStarted,
RunResumed,
BatchRead,
BatchTransformed,
BatchWritten,
CheckpointSaved,
Retrying,
DeadLettered,
RunPaused,
RunCanceled,
RunSucceeded,
RunFailed,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineEvent {
pub run_id: String,
pub pipeline_id: String,
pub timestamp_unix_ms: u64,
pub kind: PipelineEventKind,
pub message: String,
pub tenant_id: Option<String>,
pub trace_id: Option<String>,
pub correlation_id: Option<String>,
pub request_id: Option<String>,
pub metrics: BTreeMap<String, u64>,
pub metadata: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineBatch {
pub records: Vec<Row>,
pub next_checkpoint: Option<String>,
pub has_more: bool,
pub lineage: Option<String>,
pub source_freshness_unix_ms: Option<u64>,
}
impl PipelineBatch {
pub fn empty() -> Self {
Self {
records: Vec::new(),
next_checkpoint: None,
has_more: false,
lineage: None,
source_freshness_unix_ms: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineDeadLetterInput {
pub stage: PipelineStage,
pub reason: String,
pub record: Row,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TransformBatch {
pub records: Vec<Row>,
pub dead_letters: Vec<PipelineDeadLetterInput>,
}
impl TransformBatch {
pub fn passthrough(records: Vec<Row>) -> Self {
Self {
records,
dead_letters: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct SinkWriteResult {
pub written: usize,
pub duplicate_writes: usize,
pub dead_letters: Vec<PipelineDeadLetterInput>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineDeadLetter {
pub run_id: String,
pub pipeline_id: String,
pub stage: PipelineStage,
pub reason: String,
pub record: Row,
pub checkpoint: Option<String>,
pub attempt: u32,
pub timestamp_unix_ms: u64,
}
pub trait Source: Send + Sync {
fn source_name(&self) -> &str;
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
context: &QueryContext,
) -> DataResult<PipelineBatch>;
}
pub trait Transform: Send + Sync {
fn transform_name(&self) -> &str;
fn transform_batch(
&self,
pipeline: &Pipeline,
run: &PipelineRun,
records: Vec<Row>,
context: &QueryContext,
) -> DataResult<TransformBatch>;
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PassthroughTransform;
impl Transform for PassthroughTransform {
fn transform_name(&self) -> &str {
"passthrough"
}
fn transform_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
records: Vec<Row>,
_context: &QueryContext,
) -> DataResult<TransformBatch> {
Ok(TransformBatch::passthrough(records))
}
}
pub trait Sink: Send + Sync {
fn sink_name(&self) -> &str;
fn write_batch(
&self,
pipeline: &Pipeline,
run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult>;
}
pub trait CheckpointStore: Send + Sync {
fn load_checkpoint(
&self,
pipeline_id: &str,
tenant_id: Option<&str>,
) -> DataResult<Option<String>>;
fn save_checkpoint(
&self,
pipeline_id: &str,
tenant_id: Option<&str>,
checkpoint: &str,
) -> DataResult<()>;
fn clear_checkpoint(&self, pipeline_id: &str, tenant_id: Option<&str>) -> DataResult<()>;
}
pub trait DeadLetterStore: Send + Sync {
fn push_dead_letter(&self, dead_letter: PipelineDeadLetter) -> DataResult<()>;
fn list_dead_letters(&self, run_id: &str) -> DataResult<Vec<PipelineDeadLetter>>;
}
pub trait PipelineEventStore: Send + Sync {
fn append_event(&self, event: PipelineEvent) -> DataResult<()>;
fn list_events(&self, run_id: &str) -> DataResult<Vec<PipelineEvent>>;
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryCheckpointStore {
inner: Arc<Mutex<HashMap<String, String>>>,
}
impl InMemoryCheckpointStore {
fn key(pipeline_id: &str, tenant_id: Option<&str>) -> String {
format!("{}::{}", tenant_id.unwrap_or("-"), pipeline_id)
}
}
impl CheckpointStore for InMemoryCheckpointStore {
fn load_checkpoint(
&self,
pipeline_id: &str,
tenant_id: Option<&str>,
) -> DataResult<Option<String>> {
let key = Self::key(pipeline_id, tenant_id);
let guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
Ok(guard.get(&key).cloned())
}
fn save_checkpoint(
&self,
pipeline_id: &str,
tenant_id: Option<&str>,
checkpoint: &str,
) -> DataResult<()> {
let key = Self::key(pipeline_id, tenant_id);
let mut guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
guard.insert(key, checkpoint.to_string());
Ok(())
}
fn clear_checkpoint(&self, pipeline_id: &str, tenant_id: Option<&str>) -> DataResult<()> {
let key = Self::key(pipeline_id, tenant_id);
let mut guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
guard.remove(&key);
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryDeadLetterStore {
inner: Arc<Mutex<Vec<PipelineDeadLetter>>>,
}
impl DeadLetterStore for InMemoryDeadLetterStore {
fn push_dead_letter(&self, dead_letter: PipelineDeadLetter) -> DataResult<()> {
let mut guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("dead-letter store lock poisoned".to_string()))?;
guard.push(dead_letter);
Ok(())
}
fn list_dead_letters(&self, run_id: &str) -> DataResult<Vec<PipelineDeadLetter>> {
let guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("dead-letter store lock poisoned".to_string()))?;
Ok(guard
.iter()
.filter(|entry| entry.run_id == run_id)
.cloned()
.collect())
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryPipelineEventStore {
inner: Arc<Mutex<Vec<PipelineEvent>>>,
}
impl PipelineEventStore for InMemoryPipelineEventStore {
fn append_event(&self, event: PipelineEvent) -> DataResult<()> {
let mut guard = self.inner.lock().map_err(|_| {
DataError::Integration("pipeline event store lock poisoned".to_string())
})?;
guard.push(event);
Ok(())
}
fn list_events(&self, run_id: &str) -> DataResult<Vec<PipelineEvent>> {
let guard = self.inner.lock().map_err(|_| {
DataError::Integration("pipeline event store lock poisoned".to_string())
})?;
Ok(guard
.iter()
.filter(|event| event.run_id == run_id)
.cloned()
.collect())
}
}
#[derive(Debug, Clone, Default)]
pub struct PipelineControl {
paused: Arc<AtomicBool>,
canceled: Arc<AtomicBool>,
}
impl PipelineControl {
pub fn pause(&self) {
self.paused.store(true, Ordering::SeqCst);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::SeqCst);
}
pub fn cancel(&self) {
self.canceled.store(true, Ordering::SeqCst);
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
pub fn is_canceled(&self) -> bool {
self.canceled.load(Ordering::SeqCst)
}
}
pub struct PipelineRuntime {
pipeline: Pipeline,
source: Arc<dyn Source>,
transform: Arc<dyn Transform>,
sink: Arc<dyn Sink>,
checkpoint_store: Arc<dyn CheckpointStore>,
dead_letter_store: Arc<dyn DeadLetterStore>,
event_store: Arc<dyn PipelineEventStore>,
}
impl PipelineRuntime {
pub fn new(
pipeline: Pipeline,
source: Arc<dyn Source>,
transform: Arc<dyn Transform>,
sink: Arc<dyn Sink>,
checkpoint_store: Arc<dyn CheckpointStore>,
dead_letter_store: Arc<dyn DeadLetterStore>,
event_store: Arc<dyn PipelineEventStore>,
) -> Self {
Self {
pipeline,
source,
transform,
sink,
checkpoint_store,
dead_letter_store,
event_store,
}
}
pub fn execute_manual(
&self,
context: QueryContext,
control: PipelineControl,
requested_by: Option<String>,
) -> DataResult<PipelineRun> {
self.execute(
PipelineRunCommand::Manual { requested_by },
context,
control,
)
}
pub fn execute_scheduled(
&self,
context: QueryContext,
control: PipelineControl,
schedule_id: impl Into<String>,
) -> DataResult<PipelineRun> {
self.execute(
PipelineRunCommand::Scheduled {
schedule_id: schedule_id.into(),
},
context,
control,
)
}
pub fn execute_event(
&self,
context: QueryContext,
control: PipelineControl,
event: impl Into<String>,
payload: Value,
) -> DataResult<PipelineRun> {
self.execute(
PipelineRunCommand::Event {
event: event.into(),
payload,
},
context,
control,
)
}
pub fn execute(
&self,
command: PipelineRunCommand,
context: QueryContext,
control: PipelineControl,
) -> DataResult<PipelineRun> {
let started = Instant::now();
let run_id = format!("plr-{}-{}", now_unix_ms(), next_counter());
let mut run = PipelineRun::new(
run_id.clone(),
self.pipeline.id.clone(),
command.clone(),
&context,
);
self.emit_event(
&run,
&context,
PipelineEventKind::RunQueued,
"pipeline run queued",
BTreeMap::new(),
BTreeMap::new(),
)?;
run.status = PipelineRunStatus::Running;
self.emit_event(
&run,
&context,
PipelineEventKind::RunStarted,
"pipeline run started",
BTreeMap::new(),
BTreeMap::from([("command".to_string(), command.as_label().to_string())]),
)?;
let tenant_id = context.tenant_id.as_deref();
let mut checkpoint = self
.checkpoint_store
.load_checkpoint(&self.pipeline.id, tenant_id)?;
if checkpoint.is_some() {
run.resumed_from_checkpoint = checkpoint.clone();
self.emit_event(
&run,
&context,
PipelineEventKind::RunResumed,
"resuming from stored checkpoint",
BTreeMap::new(),
BTreeMap::from([(
"checkpoint".to_string(),
checkpoint.clone().unwrap_or_default(),
)]),
)?;
}
let mut batches_processed = 0usize;
loop {
if control.is_canceled() {
run.finish(PipelineRunStatus::Canceled);
self.emit_event(
&run,
&context,
PipelineEventKind::RunCanceled,
"pipeline run canceled by operator control",
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
if control.is_paused() {
run.finish(PipelineRunStatus::Paused);
self.emit_event(
&run,
&context,
PipelineEventKind::RunPaused,
"pipeline run paused by operator control",
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
if started.elapsed() > Duration::from_millis(self.pipeline.deadline_ms.max(1)) {
let message = format!(
"pipeline run deadline exceeded ({}ms)",
self.pipeline.deadline_ms
);
run.errors.push(message.clone());
run.finish(PipelineRunStatus::Failed);
self.emit_event(
&run,
&context,
PipelineEventKind::RunFailed,
&message,
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
if let Some(max_batches) = self.pipeline.max_batches_per_run {
if batches_processed >= max_batches {
run.finish(PipelineRunStatus::Paused);
self.emit_event(
&run,
&context,
PipelineEventKind::RunPaused,
"pipeline run paused at max_batches_per_run budget",
BTreeMap::from([("max_batches".to_string(), max_batches as u64)]),
BTreeMap::new(),
)?;
break;
}
}
let source_batch =
match self.with_retry(PipelineStage::Source, &context, &mut run, |attempt| {
self.source
.read_batch(&self.pipeline, checkpoint.as_deref(), &context)
.map_err(|err| {
DataError::Integration(format!(
"source read failed at attempt {}: {}",
attempt, err
))
})
}) {
Ok(batch) => batch,
Err(err) => {
let message = err.to_string();
run.errors.push(message.clone());
run.finish(PipelineRunStatus::Failed);
self.emit_event(
&run,
&context,
PipelineEventKind::RunFailed,
&message,
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
};
let read_count = source_batch.records.len() as u64;
run.rows_read = run.rows_read.saturating_add(read_count);
if let Some(lineage) = source_batch.lineage.as_ref() {
run.latest_lineage = Some(lineage.clone());
}
if let Some(source_freshness) = source_batch.source_freshness_unix_ms {
run.latest_freshness_lag_ms = Some(now_unix_ms().saturating_sub(source_freshness));
}
self.emit_event(
&run,
&context,
PipelineEventKind::BatchRead,
"source batch read complete",
BTreeMap::from([("rows".to_string(), read_count)]),
BTreeMap::from([
(
"checkpoint".to_string(),
checkpoint.clone().unwrap_or_else(|| "none".to_string()),
),
("has_more".to_string(), source_batch.has_more.to_string()),
]),
)?;
if source_batch.records.is_empty() && !source_batch.has_more {
run.finish(PipelineRunStatus::Succeeded);
self.emit_event(
&run,
&context,
PipelineEventKind::RunSucceeded,
"pipeline run completed with no additional source records",
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
let run_snapshot = run.clone();
let transformed =
match self.with_retry(PipelineStage::Transform, &context, &mut run, |_attempt| {
self.transform.transform_batch(
&self.pipeline,
&run_snapshot,
source_batch.records.clone(),
&context,
)
}) {
Ok(batch) => batch,
Err(err) => {
let reason = err.to_string();
for record in source_batch.records {
self.push_dead_letter(
&mut run,
PipelineDeadLetterInput {
stage: PipelineStage::Transform,
reason: reason.clone(),
record,
},
checkpoint.clone(),
1,
)?;
}
run.errors.push(reason.clone());
if let Some(next_checkpoint) = source_batch.next_checkpoint.clone() {
self.checkpoint_store.save_checkpoint(
&self.pipeline.id,
tenant_id,
&next_checkpoint,
)?;
run.last_checkpoint = Some(next_checkpoint.clone());
checkpoint = Some(next_checkpoint.clone());
self.emit_event(
&run,
&context,
PipelineEventKind::CheckpointSaved,
"checkpoint advanced after transform dead-letter fallback",
BTreeMap::new(),
BTreeMap::from([("checkpoint".to_string(), next_checkpoint)]),
)?;
}
if source_batch.has_more {
batches_processed += 1;
continue;
}
run.finish(PipelineRunStatus::Failed);
self.emit_event(
&run,
&context,
PipelineEventKind::RunFailed,
"transform stage exhausted and no additional source batches remain",
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
};
let transformed_count = transformed.records.len() as u64;
run.rows_transformed = run.rows_transformed.saturating_add(transformed_count);
self.emit_event(
&run,
&context,
PipelineEventKind::BatchTransformed,
"transform batch complete",
BTreeMap::from([("rows".to_string(), transformed_count)]),
BTreeMap::new(),
)?;
for dead_letter in transformed.dead_letters {
self.push_dead_letter(&mut run, dead_letter, checkpoint.clone(), 1)?;
}
let write_result = if transformed.records.is_empty() {
SinkWriteResult::default()
} else {
let run_snapshot = run.clone();
match self.with_retry(PipelineStage::Sink, &context, &mut run, |_attempt| {
self.sink.write_batch(
&self.pipeline,
&run_snapshot,
&transformed.records,
&context,
)
}) {
Ok(result) => result,
Err(err) => {
let reason = err.to_string();
for record in transformed.records {
self.push_dead_letter(
&mut run,
PipelineDeadLetterInput {
stage: PipelineStage::Sink,
reason: reason.clone(),
record,
},
checkpoint.clone(),
1,
)?;
}
run.errors.push(reason);
SinkWriteResult::default()
}
}
};
run.rows_written = run.rows_written.saturating_add(write_result.written as u64);
run.duplicate_writes = run
.duplicate_writes
.saturating_add(write_result.duplicate_writes as u64);
self.emit_event(
&run,
&context,
PipelineEventKind::BatchWritten,
"sink batch write complete",
BTreeMap::from([
("written".to_string(), write_result.written as u64),
(
"duplicate_writes".to_string(),
write_result.duplicate_writes as u64,
),
]),
BTreeMap::new(),
)?;
for dead_letter in write_result.dead_letters {
self.push_dead_letter(&mut run, dead_letter, checkpoint.clone(), 1)?;
}
if let Some(next_checkpoint) = source_batch.next_checkpoint.clone() {
self.checkpoint_store.save_checkpoint(
&self.pipeline.id,
tenant_id,
&next_checkpoint,
)?;
run.last_checkpoint = Some(next_checkpoint.clone());
checkpoint = Some(next_checkpoint.clone());
self.emit_event(
&run,
&context,
PipelineEventKind::CheckpointSaved,
"checkpoint saved",
BTreeMap::new(),
BTreeMap::from([("checkpoint".to_string(), next_checkpoint)]),
)?;
}
batches_processed = batches_processed.saturating_add(1);
if !source_batch.has_more {
run.finish(PipelineRunStatus::Succeeded);
self.emit_event(
&run,
&context,
PipelineEventKind::RunSucceeded,
"pipeline run completed successfully",
BTreeMap::new(),
BTreeMap::new(),
)?;
break;
}
}
if run.finished_at_unix_ms.is_none() {
run.finished_at_unix_ms = Some(now_unix_ms());
}
Ok(run)
}
pub fn operation_snapshot(&self, run: &PipelineRun) -> DataResult<PipelineOperationSnapshot> {
let events = self.event_store.list_events(&run.id)?;
let dead_letters = self.dead_letter_store.list_dead_letters(&run.id)?;
Ok(PipelineOperationSnapshot {
run: run.clone(),
events,
dead_letters,
})
}
fn with_retry<T, F>(
&self,
stage: PipelineStage,
context: &QueryContext,
run: &mut PipelineRun,
mut operation: F,
) -> DataResult<T>
where
F: FnMut(u32) -> DataResult<T>,
{
let attempts = self.pipeline.retry_policy.max_attempts.max(1);
let mut backoff_ms = self.pipeline.retry_policy.initial_backoff_ms;
for attempt in 1..=attempts {
match operation(attempt) {
Ok(value) => return Ok(value),
Err(err) if attempt == attempts || !is_retryable_pipeline_error(&err) => {
return Err(err);
}
Err(err) => {
run.retries = run.retries.saturating_add(1);
self.emit_event(
run,
context,
PipelineEventKind::Retrying,
"retrying pipeline stage after retryable error",
BTreeMap::from([
("attempt".to_string(), attempt as u64),
("max_attempts".to_string(), attempts as u64),
("backoff_ms".to_string(), backoff_ms),
]),
BTreeMap::from([
(
"stage".to_string(),
format!("{stage:?}").to_ascii_lowercase(),
),
("error".to_string(), err.to_string()),
]),
)?;
backoff_ms = (backoff_ms.saturating_mul(2))
.min(self.pipeline.retry_policy.max_backoff_ms.max(1));
}
}
}
Err(DataError::Integration(
"retry exhausted without terminal error detail".to_string(),
))
}
fn emit_event(
&self,
run: &PipelineRun,
context: &QueryContext,
kind: PipelineEventKind,
message: &str,
metrics: BTreeMap<String, u64>,
metadata: BTreeMap<String, String>,
) -> DataResult<()> {
self.event_store.append_event(PipelineEvent {
run_id: run.id.clone(),
pipeline_id: run.pipeline_id.clone(),
timestamp_unix_ms: now_unix_ms(),
kind,
message: message.to_string(),
tenant_id: run.tenant_id.clone(),
trace_id: run.trace_id.clone(),
correlation_id: context.correlation_id().map(ToString::to_string),
request_id: context.request_id().map(ToString::to_string),
metrics,
metadata,
})
}
fn push_dead_letter(
&self,
run: &mut PipelineRun,
input: PipelineDeadLetterInput,
checkpoint: Option<String>,
attempt: u32,
) -> DataResult<()> {
run.rows_dead_letter = run.rows_dead_letter.saturating_add(1);
self.dead_letter_store.push_dead_letter(PipelineDeadLetter {
run_id: run.id.clone(),
pipeline_id: run.pipeline_id.clone(),
stage: input.stage,
reason: input.reason,
record: input.record,
checkpoint,
attempt,
timestamp_unix_ms: now_unix_ms(),
})
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineOperationSnapshot {
pub run: PipelineRun,
pub events: Vec<PipelineEvent>,
pub dead_letters: Vec<PipelineDeadLetter>,
}
#[derive(Debug, Clone)]
pub struct InMemoryRecordSource {
source_name: String,
records: Vec<Row>,
lineage: Option<String>,
freshness_unix_ms: Option<u64>,
}
impl InMemoryRecordSource {
pub fn new(source_name: impl Into<String>, records: Vec<Row>) -> Self {
Self {
source_name: source_name.into(),
records,
lineage: None,
freshness_unix_ms: Some(now_unix_ms()),
}
}
pub fn with_lineage(mut self, lineage: impl Into<String>) -> Self {
self.lineage = Some(lineage.into());
self
}
pub fn with_freshness_unix_ms(mut self, freshness_unix_ms: u64) -> Self {
self.freshness_unix_ms = Some(freshness_unix_ms);
self
}
}
impl Source for InMemoryRecordSource {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
_context: &QueryContext,
) -> DataResult<PipelineBatch> {
let offset = parse_checkpoint_offset(checkpoint);
let limit = pipeline.batch_size.max(1);
let total_rows = self.records.len();
let rows = self
.records
.iter()
.skip(offset)
.take(limit)
.cloned()
.collect::<Vec<_>>();
let next_offset = offset.saturating_add(rows.len());
Ok(PipelineBatch {
records: rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < total_rows,
lineage: self.lineage.clone(),
source_freshness_unix_ms: self.freshness_unix_ms,
})
}
}
#[derive(Debug, Clone)]
pub struct FileJsonLineSource {
source_name: String,
path: PathBuf,
}
impl FileJsonLineSource {
pub fn new(source_name: impl Into<String>, path: impl Into<PathBuf>) -> Self {
Self {
source_name: source_name.into(),
path: path.into(),
}
}
}
impl Source for FileJsonLineSource {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
_context: &QueryContext,
) -> DataResult<PipelineBatch> {
if !self.path.exists() {
return Ok(PipelineBatch {
records: Vec::new(),
next_checkpoint: Some(parse_checkpoint_offset(checkpoint).to_string()),
has_more: false,
lineage: Some(format!("file://{}", self.path.display())),
source_freshness_unix_ms: None,
});
}
let file = OpenOptions::new().read(true).open(&self.path)?;
let reader = BufReader::new(file);
let mut records = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let value: Value = serde_json::from_str(&line)?;
let Value::Object(map) = value else {
return Err(DataError::Integration(format!(
"file source expected JSON object row in {}",
self.path.display()
)));
};
records.push(BTreeMap::from_iter(map));
}
let offset = parse_checkpoint_offset(checkpoint);
let limit = pipeline.batch_size.max(1);
let total_rows = records.len();
let rows = records
.into_iter()
.skip(offset)
.take(limit)
.collect::<Vec<_>>();
let next_offset = offset.saturating_add(rows.len());
let freshness_unix_ms = fs::metadata(&self.path)
.ok()
.and_then(|metadata| metadata.modified().ok())
.and_then(system_time_to_unix_ms);
Ok(PipelineBatch {
records: rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < total_rows,
lineage: Some(format!("file://{}", self.path.display())),
source_freshness_unix_ms: freshness_unix_ms,
})
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryObjectStore {
inner: Arc<Mutex<HashMap<String, Vec<Value>>>>,
}
impl InMemoryObjectStore {
pub fn put(&self, key: &str, payload: Vec<Value>) -> DataResult<()> {
let mut guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("object store lock poisoned".to_string()))?;
guard.insert(key.to_string(), payload);
Ok(())
}
pub fn get(&self, key: &str) -> DataResult<Vec<Value>> {
let guard = self
.inner
.lock()
.map_err(|_| DataError::Integration("object store lock poisoned".to_string()))?;
Ok(guard.get(key).cloned().unwrap_or_default())
}
}
#[derive(Debug, Clone)]
pub struct ObjectStoreJsonSource {
source_name: String,
object_store: InMemoryObjectStore,
object_key: String,
}
impl ObjectStoreJsonSource {
pub fn new(
source_name: impl Into<String>,
object_store: InMemoryObjectStore,
object_key: impl Into<String>,
) -> Self {
Self {
source_name: source_name.into(),
object_store,
object_key: object_key.into(),
}
}
}
impl Source for ObjectStoreJsonSource {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
_context: &QueryContext,
) -> DataResult<PipelineBatch> {
let offset = parse_checkpoint_offset(checkpoint);
let limit = pipeline.batch_size.max(1);
let objects = self.object_store.get(&self.object_key)?;
let total_rows = objects.len();
let mut rows = Vec::new();
for value in objects.into_iter().skip(offset).take(limit) {
let Value::Object(map) = value else {
return Err(DataError::Integration(format!(
"object store source expected JSON object rows for key `{}`",
self.object_key
)));
};
rows.push(BTreeMap::from_iter(map));
}
let next_offset = offset.saturating_add(rows.len());
Ok(PipelineBatch {
records: rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < total_rows,
lineage: Some(format!("object://{}", self.object_key)),
source_freshness_unix_ms: Some(now_unix_ms()),
})
}
}
#[derive(Debug)]
struct IdempotentSinkState {
key_field: String,
rows: Vec<Row>,
seen: HashSet<String>,
}
#[derive(Clone)]
pub struct IdempotentInMemorySink {
sink_name: String,
state: Arc<Mutex<IdempotentSinkState>>,
probe: Option<Arc<dyn Fn(&QueryContext) -> DataResult<()> + Send + Sync>>,
}
impl std::fmt::Debug for IdempotentInMemorySink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IdempotentInMemorySink")
.field("sink_name", &self.sink_name)
.finish_non_exhaustive()
}
}
impl IdempotentInMemorySink {
pub fn new(sink_name: impl Into<String>, key_field: impl Into<String>) -> Self {
Self {
sink_name: sink_name.into(),
state: Arc::new(Mutex::new(IdempotentSinkState {
key_field: key_field.into(),
rows: Vec::new(),
seen: HashSet::new(),
})),
probe: None,
}
}
pub fn with_probe(
mut self,
probe: Arc<dyn Fn(&QueryContext) -> DataResult<()> + Send + Sync>,
) -> Self {
self.probe = Some(probe);
self
}
pub fn rows(&self) -> DataResult<Vec<Row>> {
let guard = self
.state
.lock()
.map_err(|_| DataError::Integration("sink state lock poisoned".to_string()))?;
Ok(guard.rows.clone())
}
fn accept_records(&self, records: &[Row]) -> DataResult<(Vec<Row>, usize)> {
let mut guard = self
.state
.lock()
.map_err(|_| DataError::Integration("sink state lock poisoned".to_string()))?;
let key_field = guard.key_field.clone();
let mut accepted = Vec::new();
let mut duplicate_writes = 0usize;
for record in records {
let key = record
.get(&key_field)
.map(Value::to_string)
.unwrap_or_else(|| serde_json::to_string(record).unwrap_or_default());
if !guard.seen.insert(key) {
duplicate_writes = duplicate_writes.saturating_add(1);
continue;
}
guard.rows.push(record.clone());
accepted.push(record.clone());
}
Ok((accepted, duplicate_writes))
}
}
impl Sink for IdempotentInMemorySink {
fn sink_name(&self) -> &str {
&self.sink_name
}
fn write_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
if let Some(probe) = &self.probe {
probe(context)?;
}
let (accepted, duplicate_writes) = self.accept_records(records)?;
Ok(SinkWriteResult {
written: accepted.len(),
duplicate_writes,
dead_letters: Vec::new(),
})
}
}
#[derive(Debug, Clone)]
pub struct FileJsonLineSink {
sink_name: String,
path: PathBuf,
dedupe: IdempotentInMemorySink,
}
impl FileJsonLineSink {
pub fn new(
sink_name: impl Into<String>,
path: impl Into<PathBuf>,
key_field: impl Into<String>,
) -> Self {
Self {
sink_name: sink_name.into(),
path: path.into(),
dedupe: IdempotentInMemorySink::new("file-jsonline-dedupe", key_field),
}
}
}
impl Sink for FileJsonLineSink {
fn sink_name(&self) -> &str {
&self.sink_name
}
fn write_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
if let Some(probe) = &self.dedupe.probe {
probe(context)?;
}
let (accepted, duplicate_writes) = self.dedupe.accept_records(records)?;
if accepted.is_empty() {
return Ok(SinkWriteResult {
written: 0,
duplicate_writes,
dead_letters: Vec::new(),
});
}
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
for row in &accepted {
let json = serde_json::to_string(row)?;
file.write_all(json.as_bytes())?;
file.write_all(b"\n")?;
}
Ok(SinkWriteResult {
written: accepted.len(),
duplicate_writes,
dead_letters: Vec::new(),
})
}
}
#[derive(Debug, Clone)]
pub struct ObjectStoreJsonSink {
sink_name: String,
object_store: InMemoryObjectStore,
object_key: String,
dedupe: IdempotentInMemorySink,
}
impl ObjectStoreJsonSink {
pub fn new(
sink_name: impl Into<String>,
object_store: InMemoryObjectStore,
object_key: impl Into<String>,
key_field: impl Into<String>,
) -> Self {
Self {
sink_name: sink_name.into(),
object_store,
object_key: object_key.into(),
dedupe: IdempotentInMemorySink::new("object-json-dedupe", key_field),
}
}
}
impl Sink for ObjectStoreJsonSink {
fn sink_name(&self) -> &str {
&self.sink_name
}
fn write_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
if let Some(probe) = &self.dedupe.probe {
probe(context)?;
}
let (accepted, duplicate_writes) = self.dedupe.accept_records(records)?;
if accepted.is_empty() {
return Ok(SinkWriteResult {
written: 0,
duplicate_writes,
dead_letters: Vec::new(),
});
}
let mut payload = self.object_store.get(&self.object_key)?;
for row in &accepted {
payload.push(serde_json::to_value(row)?);
}
self.object_store.put(&self.object_key, payload)?;
Ok(SinkWriteResult {
written: accepted.len(),
duplicate_writes,
dead_letters: Vec::new(),
})
}
}
#[derive(Debug, Clone)]
pub struct SingleStorePipelineSource<A: SingleStoreAdapter + ?Sized> {
source_name: String,
adapter: Arc<A>,
query: SqlCommand,
dataset: String,
contract: AdapterCallContract,
artificial_delay_ms: u64,
}
impl<A: SingleStoreAdapter + ?Sized> SingleStorePipelineSource<A> {
pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
Self {
source_name: "singlestore".to_string(),
adapter,
query,
dataset: dataset.into(),
contract: AdapterCallContract::default_query(),
artificial_delay_ms: 0,
}
}
pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
self.contract = contract;
self
}
pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
self.artificial_delay_ms = artificial_delay_ms;
self
}
}
impl<A: SingleStoreAdapter + ?Sized + Send + Sync> Source for SingleStorePipelineSource<A> {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
context: &QueryContext,
) -> DataResult<PipelineBatch> {
let offset = parse_checkpoint_offset(checkpoint);
let request = DataWindowRequest {
dataset: self.dataset.clone(),
offset,
limit: pipeline.batch_size.max(1),
query_token: None,
window_token: checkpoint.map(ToString::to_string),
wire_format: WireFormatProfile::Json,
};
let response = map_integration_result(
"singlestore.pipeline_source",
run_with_contract(
"singlestore",
"pipeline_source.read_batch",
self.contract,
context,
|_| {
if self.artificial_delay_ms > 0 {
thread::sleep(Duration::from_millis(self.artificial_delay_ms));
}
self.adapter.run_high_volume_window_query(
self.query.clone(),
request.clone(),
context,
)
},
),
)?;
let next_offset = response.offset.saturating_add(response.rows.len());
Ok(PipelineBatch {
records: response.rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < response.total_rows,
lineage: Some(response.query_token),
source_freshness_unix_ms: Some(now_unix_ms()),
})
}
}
#[derive(Debug, Clone)]
pub struct ClickHousePipelineSource<A: ClickHouseAdapter + ?Sized> {
source_name: String,
adapter: Arc<A>,
query: SqlCommand,
dataset: String,
contract: AdapterCallContract,
artificial_delay_ms: u64,
}
impl<A: ClickHouseAdapter + ?Sized> ClickHousePipelineSource<A> {
pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
Self {
source_name: "clickhouse".to_string(),
adapter,
query,
dataset: dataset.into(),
contract: AdapterCallContract::default_query(),
artificial_delay_ms: 0,
}
}
pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
self.contract = contract;
self
}
pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
self.artificial_delay_ms = artificial_delay_ms;
self
}
}
impl<A: ClickHouseAdapter + ?Sized + Send + Sync> Source for ClickHousePipelineSource<A> {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
context: &QueryContext,
) -> DataResult<PipelineBatch> {
let offset = parse_checkpoint_offset(checkpoint);
let request = DataWindowRequest {
dataset: self.dataset.clone(),
offset,
limit: pipeline.batch_size.max(1),
query_token: None,
window_token: checkpoint.map(ToString::to_string),
wire_format: WireFormatProfile::Json,
};
let response = map_integration_result(
"clickhouse.pipeline_source",
run_with_contract(
"clickhouse",
"pipeline_source.read_batch",
self.contract,
context,
|_| {
if self.artificial_delay_ms > 0 {
thread::sleep(Duration::from_millis(self.artificial_delay_ms));
}
self.adapter.run_high_volume_window_query(
self.query.clone(),
request.clone(),
context,
)
},
),
)?;
let next_offset = response.offset.saturating_add(response.rows.len());
Ok(PipelineBatch {
records: response.rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < response.total_rows,
lineage: Some(response.query_token),
source_freshness_unix_ms: Some(now_unix_ms()),
})
}
}
#[derive(Debug, Clone)]
pub struct BigQueryPipelineSource<A: BigQueryAdapter + ?Sized> {
source_name: String,
adapter: Arc<A>,
query: SqlCommand,
dataset: String,
contract: AdapterCallContract,
artificial_delay_ms: u64,
}
impl<A: BigQueryAdapter + ?Sized> BigQueryPipelineSource<A> {
pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
Self {
source_name: "bigquery".to_string(),
adapter,
query,
dataset: dataset.into(),
contract: AdapterCallContract::default_query(),
artificial_delay_ms: 0,
}
}
pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
self.contract = contract;
self
}
pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
self.artificial_delay_ms = artificial_delay_ms;
self
}
}
impl<A: BigQueryAdapter + ?Sized + Send + Sync> Source for BigQueryPipelineSource<A> {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
context: &QueryContext,
) -> DataResult<PipelineBatch> {
let offset = parse_checkpoint_offset(checkpoint);
let request = DataWindowRequest {
dataset: self.dataset.clone(),
offset,
limit: pipeline.batch_size.max(1),
query_token: None,
window_token: checkpoint.map(ToString::to_string),
wire_format: WireFormatProfile::Json,
};
let response = map_integration_result(
"bigquery.pipeline_source",
run_with_contract(
"bigquery",
"pipeline_source.read_batch",
self.contract,
context,
|_| {
if self.artificial_delay_ms > 0 {
thread::sleep(Duration::from_millis(self.artificial_delay_ms));
}
self.adapter.run_high_volume_window_query(
self.query.clone(),
request.clone(),
context,
)
},
),
)?;
let next_offset = response.offset.saturating_add(response.rows.len());
Ok(PipelineBatch {
records: response.rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < response.total_rows,
lineage: Some(response.query_token),
source_freshness_unix_ms: Some(now_unix_ms()),
})
}
}
#[derive(Debug, Clone)]
pub struct OpenSearchPipelineSource<A: OpenSearchAdapter + ?Sized> {
source_name: String,
adapter: Arc<A>,
request_template: SearchRequest,
contract: AdapterCallContract,
artificial_delay_ms: u64,
}
impl<A: OpenSearchAdapter + ?Sized> OpenSearchPipelineSource<A> {
pub fn new(adapter: Arc<A>, request_template: SearchRequest) -> Self {
Self {
source_name: "opensearch".to_string(),
adapter,
request_template,
contract: AdapterCallContract::default_query(),
artificial_delay_ms: 0,
}
}
pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
self.contract = contract;
self
}
pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
self.artificial_delay_ms = artificial_delay_ms;
self
}
}
impl<A: OpenSearchAdapter + ?Sized + Send + Sync> Source for OpenSearchPipelineSource<A> {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
pipeline: &Pipeline,
checkpoint: Option<&str>,
context: &QueryContext,
) -> DataResult<PipelineBatch> {
let offset = parse_checkpoint_offset(checkpoint);
let window = DataWindowRequest {
dataset: self.request_template.index.clone(),
offset,
limit: pipeline.batch_size.max(1),
query_token: None,
window_token: checkpoint.map(ToString::to_string),
wire_format: WireFormatProfile::Json,
};
let response = map_integration_result(
"opensearch.pipeline_source",
run_with_contract(
"opensearch",
"pipeline_source.read_batch",
self.contract,
context,
|_| {
if self.artificial_delay_ms > 0 {
thread::sleep(Duration::from_millis(self.artificial_delay_ms));
}
self.adapter.search_window(
self.request_template.clone(),
window.clone(),
context,
)
},
),
)?;
let next_offset = response.offset.saturating_add(response.rows.len());
Ok(PipelineBatch {
records: response.rows,
next_checkpoint: Some(next_offset.to_string()),
has_more: next_offset < response.total_rows,
lineage: Some(response.query_token),
source_freshness_unix_ms: Some(now_unix_ms()),
})
}
}
#[derive(Clone)]
pub struct SingleStorePipelineSink<A: SingleStoreAdapter + ?Sized> {
adapter: Arc<A>,
probe_statement: String,
contract: AdapterCallContract,
inner: IdempotentInMemorySink,
}
impl<A: SingleStoreAdapter + ?Sized> std::fmt::Debug for SingleStorePipelineSink<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SingleStorePipelineSink")
.finish_non_exhaustive()
}
}
impl<A: SingleStoreAdapter + ?Sized> SingleStorePipelineSink<A> {
pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
Self {
adapter,
probe_statement: "SELECT 1".to_string(),
contract: AdapterCallContract::default_query(),
inner: IdempotentInMemorySink::new("singlestore-pipeline-sink", key_field),
}
}
pub fn with_probe_statement(mut self, statement: impl Into<String>) -> Self {
self.probe_statement = statement.into();
self
}
pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
self.contract = contract;
self
}
}
impl<A: SingleStoreAdapter + ?Sized + Send + Sync> Sink for SingleStorePipelineSink<A> {
fn sink_name(&self) -> &str {
"singlestore"
}
fn write_batch(
&self,
pipeline: &Pipeline,
run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
map_integration_result(
"singlestore.pipeline_sink",
run_with_contract(
"singlestore",
"pipeline_sink.write_batch",
self.contract,
context,
|_| {
self.adapter
.run_query(
SqlCommand::new(self.probe_statement.clone(), Vec::new()),
context,
)
.map(|_| ())
},
),
)?;
self.inner.write_batch(pipeline, run, records, context)
}
}
#[derive(Clone)]
pub struct ClickHousePipelineSink<A: ClickHouseAdapter + ?Sized> {
adapter: Arc<A>,
probe_statement: String,
contract: AdapterCallContract,
inner: IdempotentInMemorySink,
}
impl<A: ClickHouseAdapter + ?Sized> std::fmt::Debug for ClickHousePipelineSink<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClickHousePipelineSink")
.finish_non_exhaustive()
}
}
impl<A: ClickHouseAdapter + ?Sized> ClickHousePipelineSink<A> {
pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
Self {
adapter,
probe_statement: "SELECT 1".to_string(),
contract: AdapterCallContract::default_query(),
inner: IdempotentInMemorySink::new("clickhouse-pipeline-sink", key_field),
}
}
}
impl<A: ClickHouseAdapter + ?Sized + Send + Sync> Sink for ClickHousePipelineSink<A> {
fn sink_name(&self) -> &str {
"clickhouse"
}
fn write_batch(
&self,
pipeline: &Pipeline,
run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
map_integration_result(
"clickhouse.pipeline_sink",
run_with_contract(
"clickhouse",
"pipeline_sink.write_batch",
self.contract,
context,
|_| {
self.adapter
.run_query(
SqlCommand::new(self.probe_statement.clone(), Vec::new()),
context,
)
.map(|_| ())
},
),
)?;
self.inner.write_batch(pipeline, run, records, context)
}
}
#[derive(Clone)]
pub struct BigQueryPipelineSink<A: BigQueryAdapter + ?Sized> {
adapter: Arc<A>,
probe_statement: String,
contract: AdapterCallContract,
inner: IdempotentInMemorySink,
}
impl<A: BigQueryAdapter + ?Sized> std::fmt::Debug for BigQueryPipelineSink<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BigQueryPipelineSink")
.finish_non_exhaustive()
}
}
impl<A: BigQueryAdapter + ?Sized> BigQueryPipelineSink<A> {
pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
Self {
adapter,
probe_statement: "SELECT 1".to_string(),
contract: AdapterCallContract::default_query(),
inner: IdempotentInMemorySink::new("bigquery-pipeline-sink", key_field),
}
}
}
impl<A: BigQueryAdapter + ?Sized + Send + Sync> Sink for BigQueryPipelineSink<A> {
fn sink_name(&self) -> &str {
"bigquery"
}
fn write_batch(
&self,
pipeline: &Pipeline,
run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
map_integration_result(
"bigquery.pipeline_sink",
run_with_contract(
"bigquery",
"pipeline_sink.write_batch",
self.contract,
context,
|_| {
self.adapter
.run_query(
SqlCommand::new(self.probe_statement.clone(), Vec::new()),
context,
)
.map(|_| ())
},
),
)?;
self.inner.write_batch(pipeline, run, records, context)
}
}
#[derive(Clone)]
pub struct OpenSearchPipelineSink<A: OpenSearchAdapter + ?Sized> {
adapter: Arc<A>,
probe_request: SearchRequest,
contract: AdapterCallContract,
inner: IdempotentInMemorySink,
}
impl<A: OpenSearchAdapter + ?Sized> std::fmt::Debug for OpenSearchPipelineSink<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenSearchPipelineSink")
.finish_non_exhaustive()
}
}
impl<A: OpenSearchAdapter + ?Sized> OpenSearchPipelineSink<A> {
pub fn new(
adapter: Arc<A>,
key_field: impl Into<String>,
probe_index: impl Into<String>,
) -> Self {
Self {
adapter,
probe_request: SearchRequest::new(probe_index, ""),
contract: AdapterCallContract::default_query(),
inner: IdempotentInMemorySink::new("opensearch-pipeline-sink", key_field),
}
}
}
impl<A: OpenSearchAdapter + ?Sized + Send + Sync> Sink for OpenSearchPipelineSink<A> {
fn sink_name(&self) -> &str {
"opensearch"
}
fn write_batch(
&self,
pipeline: &Pipeline,
run: &PipelineRun,
records: &[Row],
context: &QueryContext,
) -> DataResult<SinkWriteResult> {
map_integration_result(
"opensearch.pipeline_sink",
run_with_contract(
"opensearch",
"pipeline_sink.write_batch",
self.contract,
context,
|_| {
self.adapter
.search(self.probe_request.clone(), context)
.map(|_| ())
},
),
)?;
self.inner.write_batch(pipeline, run, records, context)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineConformanceCheck {
pub name: String,
pub passed: bool,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct PipelineConformanceReport {
pub checks: Vec<PipelineConformanceCheck>,
}
impl PipelineConformanceReport {
pub fn passed(&self) -> bool {
self.checks.iter().all(|check| check.passed)
}
fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
self.checks.push(PipelineConformanceCheck {
name: name.into(),
passed: true,
detail: detail.into(),
});
}
fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
self.checks.push(PipelineConformanceCheck {
name: name.into(),
passed: false,
detail: detail.into(),
});
}
}
pub fn run_pipeline_adapter_conformance_suite(
singlestore: Arc<dyn SingleStoreAdapter>,
clickhouse: Arc<dyn ClickHouseAdapter>,
bigquery: Arc<dyn BigQueryAdapter>,
opensearch: Arc<dyn OpenSearchAdapter>,
context: &QueryContext,
) -> PipelineConformanceReport {
let mut report = PipelineConformanceReport::default();
let sql_query = SqlCommand::new(
"SELECT tenant, active_accounts FROM tenant_rollup",
Vec::new(),
);
let search_request = SearchRequest::new("accounts", "");
let mut checkpoint_resume_checks = Vec::new();
checkpoint_resume_checks.push(run_checkpoint_resume_probe(
"singlestore",
Pipeline::new(
"m60-singlestore-checkpoint",
"M60 SingleStore Checkpoint",
"singlestore",
"passthrough",
"idempotent_sink",
),
Arc::new(SingleStorePipelineSource::new(
singlestore.clone(),
sql_query.clone(),
"tenant_rollup",
)),
Arc::new(IdempotentInMemorySink::new(
"m60-sink-singlestore",
"tenant",
)),
context.clone(),
));
checkpoint_resume_checks.push(run_checkpoint_resume_probe(
"clickhouse",
Pipeline::new(
"m60-clickhouse-checkpoint",
"M60 ClickHouse Checkpoint",
"clickhouse",
"passthrough",
"idempotent_sink",
),
Arc::new(ClickHousePipelineSource::new(
clickhouse.clone(),
sql_query.clone(),
"tenant_rollup",
)),
Arc::new(IdempotentInMemorySink::new("m60-sink-clickhouse", "tenant")),
context.clone(),
));
checkpoint_resume_checks.push(run_checkpoint_resume_probe(
"bigquery",
Pipeline::new(
"m60-bigquery-checkpoint",
"M60 BigQuery Checkpoint",
"bigquery",
"passthrough",
"idempotent_sink",
),
Arc::new(BigQueryPipelineSource::new(
bigquery.clone(),
sql_query.clone(),
"tenant_rollup",
)),
Arc::new(IdempotentInMemorySink::new("m60-sink-bigquery", "tenant")),
context.clone(),
));
checkpoint_resume_checks.push(run_checkpoint_resume_probe(
"opensearch",
Pipeline::new(
"m60-opensearch-checkpoint",
"M60 OpenSearch Checkpoint",
"opensearch",
"passthrough",
"idempotent_sink",
),
Arc::new(OpenSearchPipelineSource::new(
opensearch.clone(),
search_request.clone(),
)),
Arc::new(IdempotentInMemorySink::new(
"m60-sink-opensearch",
"account",
)),
context.clone(),
));
for (driver, passed, detail) in checkpoint_resume_checks {
if passed {
report.add_pass(format!("{driver}.checkpoint_resume"), detail);
} else {
report.add_fail(format!("{driver}.checkpoint_resume"), detail);
}
}
let mut retry_checks = Vec::new();
retry_checks.push(run_retry_probe(
"singlestore",
Arc::new(SingleStorePipelineSource::new(
singlestore.clone(),
sql_query.clone(),
"tenant_rollup",
)),
context.clone(),
));
retry_checks.push(run_retry_probe(
"clickhouse",
Arc::new(ClickHousePipelineSource::new(
clickhouse.clone(),
sql_query.clone(),
"tenant_rollup",
)),
context.clone(),
));
retry_checks.push(run_retry_probe(
"bigquery",
Arc::new(BigQueryPipelineSource::new(
bigquery.clone(),
sql_query.clone(),
"tenant_rollup",
)),
context.clone(),
));
retry_checks.push(run_retry_probe(
"opensearch",
Arc::new(OpenSearchPipelineSource::new(
opensearch.clone(),
search_request.clone(),
)),
context.clone(),
));
for (driver, passed, detail) in retry_checks {
if passed {
report.add_pass(format!("{driver}.retry"), detail);
} else {
report.add_fail(format!("{driver}.retry"), detail);
}
}
let mut timeout_checks = Vec::new();
timeout_checks.push(run_timeout_probe(
"singlestore",
Arc::new(
SingleStorePipelineSource::new(singlestore, sql_query.clone(), "tenant_rollup")
.with_artificial_delay_ms(5),
),
context.clone().with_timeout_ms(1),
));
timeout_checks.push(run_timeout_probe(
"clickhouse",
Arc::new(
ClickHousePipelineSource::new(clickhouse, sql_query.clone(), "tenant_rollup")
.with_artificial_delay_ms(5),
),
context.clone().with_timeout_ms(1),
));
timeout_checks.push(run_timeout_probe(
"bigquery",
Arc::new(
BigQueryPipelineSource::new(bigquery, sql_query, "tenant_rollup")
.with_artificial_delay_ms(5),
),
context.clone().with_timeout_ms(1),
));
timeout_checks.push(run_timeout_probe(
"opensearch",
Arc::new(
OpenSearchPipelineSource::new(opensearch, search_request).with_artificial_delay_ms(5),
),
context.clone().with_timeout_ms(1),
));
for (driver, passed, detail) in timeout_checks {
if passed {
report.add_pass(format!("{driver}.timeout"), detail);
} else {
report.add_fail(format!("{driver}.timeout"), detail);
}
}
report
}
fn run_checkpoint_resume_probe(
driver: &str,
base_pipeline: Pipeline,
source: Arc<dyn Source>,
sink: Arc<dyn Sink>,
context: QueryContext,
) -> (String, bool, String) {
let checkpoint_store = Arc::new(InMemoryCheckpointStore::default());
let dead_letters = Arc::new(InMemoryDeadLetterStore::default());
let events = Arc::new(InMemoryPipelineEventStore::default());
let first_runtime = PipelineRuntime::new(
base_pipeline
.clone()
.with_batch_size(1)
.with_max_batches_per_run(Some(1)),
source.clone(),
Arc::new(PassthroughTransform),
sink.clone(),
checkpoint_store.clone(),
dead_letters.clone(),
events.clone(),
);
let first = first_runtime.execute_manual(context.clone(), PipelineControl::default(), None);
let Ok(first_run) = first else {
return (
driver.to_string(),
false,
format!("initial run failed: {}", first.unwrap_err()),
);
};
if first_run.status != PipelineRunStatus::Paused {
return (
driver.to_string(),
false,
format!("expected first status paused, got {:?}", first_run.status),
);
}
if first_run.last_checkpoint.is_none() {
return (
driver.to_string(),
false,
"first run did not save checkpoint".to_string(),
);
}
let second_runtime = PipelineRuntime::new(
base_pipeline.with_batch_size(1),
source,
Arc::new(PassthroughTransform),
sink.clone(),
checkpoint_store,
dead_letters,
events,
);
let second = second_runtime.execute_manual(context.clone(), PipelineControl::default(), None);
let Ok(second_run) = second else {
return (
driver.to_string(),
false,
format!("resume run failed: {}", second.unwrap_err()),
);
};
if second_run.status != PipelineRunStatus::Succeeded {
return (
driver.to_string(),
false,
format!(
"expected resume status succeeded, got {:?}",
second_run.status
),
);
}
let third = second_runtime.execute_manual(context, PipelineControl::default(), None);
let Ok(third_run) = third else {
return (
driver.to_string(),
false,
format!("repeat run failed: {}", third.unwrap_err()),
);
};
if third_run.rows_written != 0 {
return (
driver.to_string(),
false,
format!(
"expected idempotent repeat run writes=0 got {}",
third_run.rows_written
),
);
}
(
driver.to_string(),
true,
format!(
"resume status={:?}, repeat writes={}",
second_run.status, third_run.rows_written
),
)
}
fn run_retry_probe(
driver: &str,
source: Arc<dyn Source>,
context: QueryContext,
) -> (String, bool, String) {
let attempts = Arc::new(AtomicUsize::new(0));
let attempts_clone = attempts.clone();
let sink = Arc::new(
IdempotentInMemorySink::new(format!("m60-flaky-{driver}"), "tenant").with_probe(Arc::new(
move |_ctx| {
let attempt = attempts_clone.fetch_add(1, Ordering::SeqCst);
if attempt == 0 {
return Err(DataError::Integration(
"transient sink probe failure".to_string(),
));
}
Ok(())
},
)),
);
let runtime = PipelineRuntime::new(
Pipeline::new(
format!("m60-{driver}-retry"),
format!("M60 {driver} Retry Probe"),
driver,
"passthrough",
"flaky_sink",
)
.with_batch_size(1)
.with_retry_policy(RetryPolicy {
max_attempts: 2,
initial_backoff_ms: 0,
max_backoff_ms: 0,
}),
source,
Arc::new(PassthroughTransform),
sink,
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
match runtime.execute_manual(context, PipelineControl::default(), None) {
Ok(run) if run.status == PipelineRunStatus::Succeeded && run.retries >= 1 => {
(driver.to_string(), true, format!("retries={}", run.retries))
}
Ok(run) => (
driver.to_string(),
false,
format!(
"unexpected run status={:?} retries={}",
run.status, run.retries
),
),
Err(err) => (
driver.to_string(),
false,
format!("retry probe failed: {err}"),
),
}
}
fn run_timeout_probe(
driver: &str,
source: Arc<dyn Source>,
context: QueryContext,
) -> (String, bool, String) {
let runtime = PipelineRuntime::new(
Pipeline::new(
format!("m60-{driver}-timeout"),
format!("M60 {driver} Timeout Probe"),
driver,
"passthrough",
"noop_sink",
)
.with_batch_size(1)
.with_deadline_ms(1_000)
.with_retry_policy(RetryPolicy::never()),
source,
Arc::new(PassthroughTransform),
Arc::new(IdempotentInMemorySink::new("m60-timeout-sink", "tenant")),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
match runtime.execute_manual(context, PipelineControl::default(), None) {
Ok(run) => {
let detail = run.errors.join(" | ");
let passed = run.status == PipelineRunStatus::Failed
&& detail.to_ascii_lowercase().contains("timeout");
(
driver.to_string(),
passed,
if passed {
"timeout classification observed".to_string()
} else {
format!(
"expected failed timeout run, got status={:?} errors={detail}",
run.status
)
},
)
}
Err(err) => (driver.to_string(), false, format!("runtime failed: {err}")),
}
}
fn parse_checkpoint_offset(checkpoint: Option<&str>) -> usize {
let Some(raw) = checkpoint else {
return 0;
};
let trimmed = raw.trim();
if trimmed.is_empty() {
return 0;
}
if let Ok(value) = trimmed.parse::<usize>() {
return value;
}
if let Some(value) = trimmed
.split("offset=")
.nth(1)
.and_then(|tail| tail.split(':').next())
.and_then(|value| value.parse::<usize>().ok())
{
return value;
}
0
}
fn is_retryable_pipeline_error(err: &DataError) -> bool {
match err {
DataError::Integration(message) | DataError::Query(message) => {
let normalized = message.to_ascii_lowercase();
normalized.contains("transient")
|| normalized.contains("timeout")
|| normalized.contains("unavailable")
|| normalized.contains("retry")
}
DataError::Io(_) => true,
_ => false,
}
}
fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::from_secs(0))
.as_millis() as u64
}
fn system_time_to_unix_ms(value: SystemTime) -> Option<u64> {
value
.duration_since(UNIX_EPOCH)
.ok()
.map(|duration| duration.as_millis() as u64)
}
fn next_counter() -> u64 {
static RUN_COUNTER: AtomicU64 = AtomicU64::new(1);
RUN_COUNTER.fetch_add(1, Ordering::Relaxed)
}
pub fn pipeline_records_to_json_lines(records: &[Row]) -> DataResult<String> {
let mut out = String::new();
for row in records {
out.push_str(&serde_json::to_string(row)?);
out.push('\n');
}
Ok(out)
}
pub fn write_pipeline_records_to_file(path: &Path, records: &[Row]) -> DataResult<()> {
let payload = pipeline_records_to_json_lines(records)?;
fs::write(path, payload)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryOpenSearchAdapter,
InMemorySingleStoreAdapter, StoredRow,
};
use serde_json::json;
use std::collections::VecDeque;
fn row(pairs: &[(&str, Value)]) -> Row {
pairs
.iter()
.map(|(key, value)| ((*key).to_string(), value.clone()))
.collect()
}
#[derive(Debug, Clone)]
struct SequenceSource {
source_name: String,
responses: Arc<Mutex<VecDeque<DataResult<PipelineBatch>>>>,
}
impl SequenceSource {
fn new(
source_name: impl Into<String>,
responses: impl IntoIterator<Item = DataResult<PipelineBatch>>,
) -> Self {
Self {
source_name: source_name.into(),
responses: Arc::new(Mutex::new(responses.into_iter().collect())),
}
}
}
impl Source for SequenceSource {
fn source_name(&self) -> &str {
&self.source_name
}
fn read_batch(
&self,
_pipeline: &Pipeline,
_checkpoint: Option<&str>,
_context: &QueryContext,
) -> DataResult<PipelineBatch> {
let mut guard = self
.responses
.lock()
.map_err(|_| DataError::Integration("sequence source lock poisoned".to_string()))?;
Ok(guard
.pop_front()
.transpose()?
.unwrap_or_else(PipelineBatch::empty))
}
}
#[derive(Debug, Clone)]
struct AlwaysFailTransform {
message: String,
}
impl Transform for AlwaysFailTransform {
fn transform_name(&self) -> &str {
"always-fail"
}
fn transform_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
_records: Vec<Row>,
_context: &QueryContext,
) -> DataResult<TransformBatch> {
Err(DataError::Validation(self.message.clone()))
}
}
#[derive(Debug, Clone)]
struct AlwaysFailSink {
sink_name: String,
message: String,
}
impl Sink for AlwaysFailSink {
fn sink_name(&self) -> &str {
&self.sink_name
}
fn write_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
_records: &[Row],
_context: &QueryContext,
) -> DataResult<SinkWriteResult> {
Err(DataError::Validation(self.message.clone()))
}
}
#[test]
fn pipeline_runtime_resume_checkpoint_and_idempotent_sink_prevents_duplicates() {
let source = Arc::new(InMemoryRecordSource::new(
"in-memory",
vec![
row(&[("id", json!(1)), ("account", json!("Acme"))]),
row(&[("id", json!(2)), ("account", json!("Globex"))]),
row(&[("id", json!(3)), ("account", json!("Umbrella"))]),
],
));
let sink = Arc::new(IdempotentInMemorySink::new("idempotent", "id"));
let checkpoints = Arc::new(InMemoryCheckpointStore::default());
let dead_letters = Arc::new(InMemoryDeadLetterStore::default());
let events = Arc::new(InMemoryPipelineEventStore::default());
let context = QueryContext::default().with_tenant_id("tenant-a");
let first = PipelineRuntime::new(
Pipeline::new(
"pipeline-resume",
"Resume Pipeline",
"source",
"transform",
"sink",
)
.with_batch_size(1)
.with_max_batches_per_run(Some(1)),
source.clone(),
Arc::new(PassthroughTransform),
sink.clone(),
checkpoints.clone(),
dead_letters.clone(),
events.clone(),
)
.execute_manual(context.clone(), PipelineControl::default(), None)
.unwrap();
assert_eq!(first.status, PipelineRunStatus::Paused);
assert_eq!(first.rows_written, 1);
assert_eq!(sink.rows().unwrap().len(), 1);
let resumed = PipelineRuntime::new(
Pipeline::new(
"pipeline-resume",
"Resume Pipeline",
"source",
"transform",
"sink",
)
.with_batch_size(1),
source.clone(),
Arc::new(PassthroughTransform),
sink.clone(),
checkpoints.clone(),
dead_letters.clone(),
events.clone(),
)
.execute_manual(context.clone(), PipelineControl::default(), None)
.unwrap();
assert_eq!(resumed.status, PipelineRunStatus::Succeeded);
assert_eq!(sink.rows().unwrap().len(), 3);
let repeat = PipelineRuntime::new(
Pipeline::new(
"pipeline-resume",
"Resume Pipeline",
"source",
"transform",
"sink",
)
.with_batch_size(1),
source,
Arc::new(PassthroughTransform),
sink.clone(),
checkpoints,
dead_letters,
events,
)
.execute_manual(context, PipelineControl::default(), None)
.unwrap();
assert_eq!(repeat.rows_written, 0);
assert_eq!(sink.rows().unwrap().len(), 3);
}
#[derive(Debug, Clone)]
struct StatusAwareTransform;
impl Transform for StatusAwareTransform {
fn transform_name(&self) -> &str {
"status-aware"
}
fn transform_batch(
&self,
_pipeline: &Pipeline,
_run: &PipelineRun,
records: Vec<Row>,
_context: &QueryContext,
) -> DataResult<TransformBatch> {
let mut ok = Vec::new();
let mut dead_letters = Vec::new();
for record in records {
if record
.get("status")
.and_then(Value::as_str)
.is_some_and(|status| status == "bad")
{
dead_letters.push(PipelineDeadLetterInput {
stage: PipelineStage::Transform,
reason: "blocked status".to_string(),
record,
});
} else {
ok.push(record);
}
}
Ok(TransformBatch {
records: ok,
dead_letters,
})
}
}
#[test]
fn pipeline_runtime_routes_transform_dead_letters_and_tracks_counts() {
let runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-deadletter",
"Deadletter Pipeline",
"source",
"status-aware",
"sink",
)
.with_batch_size(10),
Arc::new(InMemoryRecordSource::new(
"in-memory",
vec![
row(&[("id", json!(1)), ("status", json!("ok"))]),
row(&[("id", json!(2)), ("status", json!("bad"))]),
row(&[("id", json!(3)), ("status", json!("ok"))]),
],
)),
Arc::new(StatusAwareTransform),
Arc::new(IdempotentInMemorySink::new("sink", "id")),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
let run = runtime
.execute_manual(QueryContext::default(), PipelineControl::default(), None)
.unwrap();
assert_eq!(run.status, PipelineRunStatus::Succeeded);
assert_eq!(run.rows_dead_letter, 1);
let snapshot = runtime.operation_snapshot(&run).unwrap();
assert_eq!(snapshot.dead_letters.len(), 1);
assert!(snapshot.dead_letters[0].reason.contains("blocked status"));
}
#[test]
fn pipeline_runtime_respects_operator_pause_and_cancel_controls() {
let runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-control",
"Control Pipeline",
"source",
"transform",
"sink",
)
.with_batch_size(1),
Arc::new(InMemoryRecordSource::new(
"in-memory",
vec![
row(&[("id", json!(1))]),
row(&[("id", json!(2))]),
row(&[("id", json!(3))]),
],
)),
Arc::new(PassthroughTransform),
Arc::new(IdempotentInMemorySink::new("sink", "id")),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
let paused_control = PipelineControl::default();
paused_control.pause();
let paused = runtime
.execute_manual(QueryContext::default(), paused_control, None)
.unwrap();
assert_eq!(paused.status, PipelineRunStatus::Paused);
assert_eq!(paused.rows_read, 0);
let canceled_control = PipelineControl::default();
canceled_control.cancel();
let canceled = runtime
.execute_manual(QueryContext::default(), canceled_control, None)
.unwrap();
assert_eq!(canceled.status, PipelineRunStatus::Canceled);
assert_eq!(canceled.rows_read, 0);
}
#[test]
fn file_and_object_store_profiles_roundtrip_rows() {
let tmp = std::env::temp_dir().join(format!("shelly_m60_file_{}.jsonl", now_unix_ms()));
let records = vec![
row(&[("id", json!(1)), ("account", json!("Acme"))]),
row(&[("id", json!(2)), ("account", json!("Globex"))]),
];
write_pipeline_records_to_file(&tmp, &records).unwrap();
let file_source = FileJsonLineSource::new("file-source", &tmp);
let file_batch = file_source
.read_batch(
&Pipeline::new("file", "file", "file", "transform", "sink").with_batch_size(10),
None,
&QueryContext::default(),
)
.unwrap();
assert_eq!(file_batch.records.len(), 2);
let object_store = InMemoryObjectStore::default();
object_store
.put(
"accounts",
records
.iter()
.map(|row| serde_json::to_value(row).unwrap())
.collect(),
)
.unwrap();
let object_source =
ObjectStoreJsonSource::new("object-source", object_store.clone(), "accounts");
let object_batch = object_source
.read_batch(
&Pipeline::new("obj", "obj", "obj", "transform", "sink").with_batch_size(10),
None,
&QueryContext::default(),
)
.unwrap();
assert_eq!(object_batch.records.len(), 2);
let object_sink =
ObjectStoreJsonSink::new("object-sink", object_store.clone(), "sink_rows", "id");
let run = PipelineRun::new(
"run-1".to_string(),
"obj".to_string(),
PipelineRunCommand::Manual { requested_by: None },
&QueryContext::default(),
);
let write = object_sink
.write_batch(
&Pipeline::new("obj", "obj", "obj", "transform", "sink"),
&run,
&object_batch.records,
&QueryContext::default(),
)
.unwrap();
assert_eq!(write.written, 2);
let stored = object_store.get("sink_rows").unwrap();
assert_eq!(stored.len(), 2);
let _ = fs::remove_file(tmp);
}
#[test]
fn pipeline_builder_and_helper_paths_cover_edge_cases() {
let pipeline = Pipeline::new("pipeline-helpers", "Helpers", "source", "transform", "sink")
.with_trigger(PipelineTrigger::EventDriven)
.with_batch_size(0)
.with_max_batches_per_run(Some(0))
.with_deadline_ms(0)
.with_retry_policy(RetryPolicy {
max_attempts: 0,
initial_backoff_ms: 10,
max_backoff_ms: 1,
})
.with_metadata("region", "us-east-1");
assert_eq!(pipeline.trigger, PipelineTrigger::EventDriven);
assert_eq!(pipeline.batch_size, 1);
assert_eq!(pipeline.max_batches_per_run, Some(1));
assert_eq!(pipeline.deadline_ms, 1);
assert_eq!(pipeline.retry_policy.max_attempts, 1);
assert_eq!(pipeline.retry_policy.initial_backoff_ms, 10);
assert_eq!(pipeline.retry_policy.max_backoff_ms, 10);
assert_eq!(
pipeline.metadata.get("region").map(String::as_str),
Some("us-east-1")
);
assert_eq!(
PipelineRunCommand::Manual { requested_by: None }.as_label(),
"manual"
);
assert_eq!(
PipelineRunCommand::Scheduled {
schedule_id: "cron".to_string()
}
.as_label(),
"scheduled"
);
assert_eq!(
PipelineRunCommand::Event {
event: "ingest".to_string(),
payload: json!({"ok": true}),
}
.as_label(),
"event"
);
let mut run = PipelineRun::new(
"run-helpers".to_string(),
pipeline.id.clone(),
PipelineRunCommand::Manual {
requested_by: Some("operator".to_string()),
},
&QueryContext::default().with_tenant_id("tenant-a"),
);
assert_eq!(run.status, PipelineRunStatus::Queued);
run.finish(PipelineRunStatus::Succeeded);
assert_eq!(run.status, PipelineRunStatus::Succeeded);
assert!(run.finished_at_unix_ms.is_some());
let empty_batch = PipelineBatch::empty();
assert!(empty_batch.records.is_empty());
assert!(!empty_batch.has_more);
let passthrough = TransformBatch::passthrough(vec![row(&[("id", json!(1))])]);
assert_eq!(passthrough.records.len(), 1);
assert!(passthrough.dead_letters.is_empty());
let control = PipelineControl::default();
assert!(!control.is_paused());
assert!(!control.is_canceled());
control.pause();
assert!(control.is_paused());
control.resume();
assert!(!control.is_paused());
control.cancel();
assert!(control.is_canceled());
assert_eq!(parse_checkpoint_offset(None), 0);
assert_eq!(parse_checkpoint_offset(Some("")), 0);
assert_eq!(parse_checkpoint_offset(Some("7")), 7);
assert_eq!(parse_checkpoint_offset(Some("query:offset=9:limit=1")), 9);
assert_eq!(parse_checkpoint_offset(Some("offset=abc")), 0);
assert!(is_retryable_pipeline_error(&DataError::Integration(
"transient backend timeout".to_string()
)));
assert!(is_retryable_pipeline_error(&DataError::Query(
"service unavailable".to_string()
)));
assert!(is_retryable_pipeline_error(&DataError::Io(
std::io::Error::other("disk busy")
)));
assert!(!is_retryable_pipeline_error(&DataError::Validation(
"invalid payload".to_string()
)));
let lines = pipeline_records_to_json_lines(&[
row(&[("id", json!(1)), ("name", json!("Acme"))]),
row(&[("id", json!(2)), ("name", json!("Globex"))]),
])
.unwrap();
assert!(lines.ends_with('\n'));
assert_eq!(lines.lines().count(), 2);
}
#[test]
fn in_memory_pipeline_stores_cover_crud_paths() {
let checkpoints = InMemoryCheckpointStore::default();
checkpoints
.save_checkpoint("pipeline-a", Some("tenant-a"), "11")
.unwrap();
assert_eq!(
checkpoints
.load_checkpoint("pipeline-a", Some("tenant-a"))
.unwrap()
.as_deref(),
Some("11")
);
checkpoints
.clear_checkpoint("pipeline-a", Some("tenant-a"))
.unwrap();
assert!(checkpoints
.load_checkpoint("pipeline-a", Some("tenant-a"))
.unwrap()
.is_none());
let dead_letters = InMemoryDeadLetterStore::default();
dead_letters
.push_dead_letter(PipelineDeadLetter {
run_id: "run-1".to_string(),
pipeline_id: "pipeline-a".to_string(),
stage: PipelineStage::Sink,
reason: "duplicate".to_string(),
record: row(&[("id", json!(1))]),
checkpoint: Some("1".to_string()),
attempt: 1,
timestamp_unix_ms: now_unix_ms(),
})
.unwrap();
dead_letters
.push_dead_letter(PipelineDeadLetter {
run_id: "run-2".to_string(),
pipeline_id: "pipeline-a".to_string(),
stage: PipelineStage::Transform,
reason: "invalid".to_string(),
record: row(&[("id", json!(2))]),
checkpoint: Some("2".to_string()),
attempt: 2,
timestamp_unix_ms: now_unix_ms(),
})
.unwrap();
let run1_letters = dead_letters.list_dead_letters("run-1").unwrap();
assert_eq!(run1_letters.len(), 1);
assert_eq!(run1_letters[0].reason, "duplicate");
let events = InMemoryPipelineEventStore::default();
events
.append_event(PipelineEvent {
run_id: "run-1".to_string(),
pipeline_id: "pipeline-a".to_string(),
timestamp_unix_ms: now_unix_ms(),
kind: PipelineEventKind::RunStarted,
message: "started".to_string(),
tenant_id: Some("tenant-a".to_string()),
trace_id: None,
correlation_id: Some("corr-1".to_string()),
request_id: Some("req-1".to_string()),
metrics: BTreeMap::from_iter([("rows_read".to_string(), 0u64)]),
metadata: BTreeMap::from_iter([("source".to_string(), "in-memory".to_string())]),
})
.unwrap();
events
.append_event(PipelineEvent {
run_id: "run-2".to_string(),
pipeline_id: "pipeline-a".to_string(),
timestamp_unix_ms: now_unix_ms(),
kind: PipelineEventKind::RunSucceeded,
message: "done".to_string(),
tenant_id: Some("tenant-a".to_string()),
trace_id: None,
correlation_id: Some("corr-2".to_string()),
request_id: Some("req-2".to_string()),
metrics: BTreeMap::new(),
metadata: BTreeMap::new(),
})
.unwrap();
let run1_events = events.list_events("run-1").unwrap();
assert_eq!(run1_events.len(), 1);
assert_eq!(run1_events[0].kind, PipelineEventKind::RunStarted);
}
#[test]
fn source_profiles_cover_missing_and_invalid_payload_paths() {
let pipeline = Pipeline::new("source-checks", "Source Checks", "source", "t", "sink")
.with_batch_size(2);
let missing_file =
std::env::temp_dir().join(format!("shelly_missing_{}.jsonl", now_unix_ms()));
let missing_source = FileJsonLineSource::new("missing-file", &missing_file);
let missing_batch = missing_source
.read_batch(&pipeline, Some("offset=3"), &QueryContext::default())
.unwrap();
assert!(missing_batch.records.is_empty());
assert_eq!(missing_batch.next_checkpoint.as_deref(), Some("3"));
assert_eq!(Source::source_name(&missing_source), "missing-file");
let invalid_file =
std::env::temp_dir().join(format!("shelly_invalid_{}.jsonl", now_unix_ms()));
fs::write(&invalid_file, "[1,2,3]\n").unwrap();
let invalid_source = FileJsonLineSource::new("invalid-file", &invalid_file);
let err = invalid_source
.read_batch(&pipeline, None, &QueryContext::default())
.unwrap_err()
.to_string();
assert!(err.contains("expected JSON object row"));
let _ = fs::remove_file(&invalid_file);
let object_store = InMemoryObjectStore::default();
object_store
.put("bad_rows", vec![json!(["not-an-object"])])
.unwrap();
let object_source = ObjectStoreJsonSource::new("object-source", object_store, "bad_rows");
let err = object_source
.read_batch(&pipeline, None, &QueryContext::default())
.unwrap_err()
.to_string();
assert!(err.contains("expected JSON object rows"));
}
#[test]
fn enterprise_pipeline_adapters_cover_success_and_error_paths() {
let sql_rows = vec![
row(&[("tenant", json!("north")), ("active_accounts", json!(42))]),
row(&[("tenant", json!("south")), ("active_accounts", json!(27))]),
];
let search_rows = vec![
StoredRow {
id: 1,
data: row(&[("account", json!("Acme")), ("status", json!("renewal"))]),
},
StoredRow {
id: 2,
data: row(&[("account", json!("Globex")), ("status", json!("new"))]),
},
];
let pipeline = Pipeline::new("enterprise", "Enterprise", "source", "transform", "sink")
.with_batch_size(2);
let context = QueryContext::default()
.with_tenant_id("tenant-a")
.with_correlation_id("corr-a")
.with_request_id("req-a");
let query = SqlCommand::new(
"SELECT tenant, active_accounts FROM tenant_rollup",
Vec::new(),
);
let singlestore_source = SingleStorePipelineSource::new(
Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
query.clone(),
"tenant_rollup",
)
.with_contract(AdapterCallContract::default_query().with_timeout_ms(5_000))
.with_artificial_delay_ms(0);
let batch = singlestore_source
.read_batch(&pipeline, Some("offset=0"), &context)
.unwrap();
assert_eq!(batch.records.len(), 2);
assert_eq!(Source::source_name(&singlestore_source), "singlestore");
let clickhouse_source = ClickHousePipelineSource::new(
Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
query.clone(),
"tenant_rollup",
)
.with_contract(AdapterCallContract::low_latency().with_timeout_ms(1_000))
.with_artificial_delay_ms(0);
let batch = clickhouse_source
.read_batch(&pipeline, Some("offset=0"), &context)
.unwrap();
assert_eq!(batch.records.len(), 2);
let bigquery_source = BigQueryPipelineSource::new(
Arc::new(InMemoryBigQueryAdapter::new(sql_rows.clone())),
query.clone(),
"tenant_rollup",
)
.with_contract(AdapterCallContract::default_query())
.with_artificial_delay_ms(0);
let batch = bigquery_source
.read_batch(&pipeline, Some("offset=0"), &context)
.unwrap();
assert_eq!(batch.records.len(), 2);
let opensearch_source = OpenSearchPipelineSource::new(
Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
SearchRequest::new("accounts", ""),
)
.with_contract(AdapterCallContract::default_query())
.with_artificial_delay_ms(0);
let batch = opensearch_source
.read_batch(&pipeline, Some("offset=0"), &context)
.unwrap();
assert_eq!(batch.records.len(), 2);
let err = SingleStorePipelineSource::new(
Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
SqlCommand::new("", Vec::new()),
"tenant_rollup",
)
.read_batch(&pipeline, None, &context)
.unwrap_err()
.to_string();
assert!(err.contains("empty SQL statement"));
let err = OpenSearchPipelineSource::new(
Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
SearchRequest::new("", ""),
)
.read_batch(&pipeline, None, &context)
.unwrap_err()
.to_string();
assert!(err.contains("search index must not be empty"));
let run = PipelineRun::new(
"run-1".to_string(),
"enterprise".to_string(),
PipelineRunCommand::Manual { requested_by: None },
&context,
);
let records = vec![
row(&[("id", json!(1)), ("account", json!("Acme"))]),
row(&[("id", json!(1)), ("account", json!("Acme"))]),
row(&[("id", json!(2)), ("account", json!("Globex"))]),
];
let singlestore_sink = SingleStorePipelineSink::new(
Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
"id",
)
.with_probe_statement("SELECT 1")
.with_contract(AdapterCallContract::default_query());
let write = singlestore_sink
.write_batch(&pipeline, &run, &records, &context)
.unwrap();
assert_eq!(write.written, 2);
assert_eq!(write.duplicate_writes, 1);
assert_eq!(Sink::sink_name(&singlestore_sink), "singlestore");
assert!(format!("{singlestore_sink:?}").contains("SingleStorePipelineSink"));
let err = SingleStorePipelineSink::new(
Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
"id",
)
.with_probe_statement("")
.write_batch(&pipeline, &run, &records, &context)
.unwrap_err()
.to_string();
assert!(err.contains("empty SQL statement"));
let clickhouse_sink = ClickHousePipelineSink::new(
Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
"id",
);
let write = clickhouse_sink
.write_batch(&pipeline, &run, &records, &context)
.unwrap();
assert_eq!(write.written, 2);
assert_eq!(Sink::sink_name(&clickhouse_sink), "clickhouse");
assert!(format!("{clickhouse_sink:?}").contains("ClickHousePipelineSink"));
let bigquery_sink =
BigQueryPipelineSink::new(Arc::new(InMemoryBigQueryAdapter::new(sql_rows)), "id");
let write = bigquery_sink
.write_batch(&pipeline, &run, &records, &context)
.unwrap();
assert_eq!(write.written, 2);
assert_eq!(Sink::sink_name(&bigquery_sink), "bigquery");
assert!(format!("{bigquery_sink:?}").contains("BigQueryPipelineSink"));
let opensearch_sink = OpenSearchPipelineSink::new(
Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
"id",
"accounts",
);
let write = opensearch_sink
.write_batch(&pipeline, &run, &records, &context)
.unwrap();
assert_eq!(write.written, 2);
assert_eq!(Sink::sink_name(&opensearch_sink), "opensearch");
assert!(format!("{opensearch_sink:?}").contains("OpenSearchPipelineSink"));
let err = OpenSearchPipelineSink::new(
Arc::new(InMemoryOpenSearchAdapter::new(search_rows)),
"id",
"",
)
.write_batch(&pipeline, &run, &records, &context)
.unwrap_err()
.to_string();
assert!(err.contains("search index must not be empty"));
}
#[test]
fn conformance_report_tracks_failures() {
let mut report = PipelineConformanceReport::default();
report.add_pass("checkpoint_resume", "ok");
assert!(report.passed());
report.add_fail("retry", "transient error");
assert!(!report.passed());
assert_eq!(report.checks.len(), 2);
assert!(!report.checks[1].passed);
}
#[test]
fn pipeline_adapter_conformance_suite_passes_with_reference_adapters() {
let sql_rows = vec![
row(&[("tenant", json!("north")), ("active_accounts", json!(42))]),
row(&[("tenant", json!("south")), ("active_accounts", json!(27))]),
];
let search_rows = vec![
StoredRow {
id: 1,
data: row(&[("account", json!("Acme")), ("status", json!("renewal"))]),
},
StoredRow {
id: 2,
data: row(&[("account", json!("Globex")), ("status", json!("new"))]),
},
];
let report = run_pipeline_adapter_conformance_suite(
Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
Arc::new(InMemoryBigQueryAdapter::new(sql_rows)),
Arc::new(InMemoryOpenSearchAdapter::new(search_rows)),
&QueryContext::default()
.with_tenant_id("tenant-a")
.with_correlation_id("m60-correlation")
.with_request_id("m60-request"),
);
assert!(
report.passed(),
"expected conformance suite to pass, got {:?}",
report
);
assert!(report
.checks
.iter()
.any(|check| check.name.ends_with(".timeout")));
assert!(report
.checks
.iter()
.any(|check| check.name.ends_with(".retry")));
assert!(report
.checks
.iter()
.any(|check| check.name.ends_with(".checkpoint_resume")));
}
#[test]
fn pipeline_runtime_command_wrappers_execute_scheduled_and_event() {
let runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-command-wrappers",
"Pipeline Command Wrappers",
"sequence",
"passthrough",
"sink",
),
Arc::new(SequenceSource::new(
"sequence",
[Ok(PipelineBatch::empty()), Ok(PipelineBatch::empty())],
)),
Arc::new(PassthroughTransform),
Arc::new(IdempotentInMemorySink::new("sink", "id")),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
let context = QueryContext::default().with_tenant_id("tenant-schedule");
let scheduled = runtime
.execute_scheduled(
context.clone(),
PipelineControl::default(),
"schedule-nightly",
)
.unwrap();
assert_eq!(scheduled.status, PipelineRunStatus::Succeeded);
assert!(matches!(
scheduled.command,
PipelineRunCommand::Scheduled { .. }
));
let event = runtime
.execute_event(
context,
PipelineControl::default(),
"ingest",
json!({"reason": "manual"}),
)
.unwrap();
assert_eq!(event.status, PipelineRunStatus::Succeeded);
assert!(matches!(event.command, PipelineRunCommand::Event { .. }));
}
#[test]
fn pipeline_runtime_covers_source_error_and_deadline_paths() {
let source_failure_runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-source-failure",
"Pipeline Source Failure",
"sequence",
"passthrough",
"sink",
)
.with_retry_policy(RetryPolicy::never()),
Arc::new(SequenceSource::new(
"sequence",
[Err(DataError::Validation(
"source validation failure".to_string(),
))],
)),
Arc::new(PassthroughTransform),
Arc::new(IdempotentInMemorySink::new("sink", "id")),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
let source_failed = source_failure_runtime
.execute_manual(QueryContext::default(), PipelineControl::default(), None)
.unwrap();
assert_eq!(source_failed.status, PipelineRunStatus::Failed);
assert!(source_failed
.errors
.iter()
.any(|error| error.contains("source validation failure")));
let deadline_runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-deadline",
"Pipeline Deadline",
"sequence",
"passthrough",
"sink",
)
.with_batch_size(1)
.with_deadline_ms(1),
Arc::new(SequenceSource::new(
"sequence",
[Ok(PipelineBatch {
records: vec![row(&[("id", json!(1))])],
next_checkpoint: Some("1".to_string()),
has_more: true,
lineage: Some("deadline-test".to_string()),
source_freshness_unix_ms: Some(now_unix_ms()),
})],
)),
Arc::new(PassthroughTransform),
Arc::new(
IdempotentInMemorySink::new("sink", "id").with_probe(Arc::new(|_context| {
thread::sleep(Duration::from_millis(5));
Ok(())
})),
),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
let deadline_failed = deadline_runtime
.execute_manual(QueryContext::default(), PipelineControl::default(), None)
.unwrap();
assert_eq!(deadline_failed.status, PipelineRunStatus::Failed);
assert!(deadline_failed
.errors
.iter()
.any(|error| error.contains("deadline exceeded")));
}
#[test]
fn pipeline_runtime_covers_transform_and_sink_failure_paths() {
let transform_failure_source = Arc::new(SequenceSource::new(
"sequence",
[Ok(PipelineBatch {
records: vec![
row(&[("id", json!(1)), ("tenant", json!("a"))]),
row(&[("id", json!(2)), ("tenant", json!("b"))]),
],
next_checkpoint: Some("2".to_string()),
has_more: false,
lineage: Some("transform-failure".to_string()),
source_freshness_unix_ms: Some(now_unix_ms()),
})],
));
let transform_failure_dead_letters = Arc::new(InMemoryDeadLetterStore::default());
let transform_failure_runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-transform-failure",
"Pipeline Transform Failure",
"sequence",
"always-fail",
"sink",
),
transform_failure_source,
Arc::new(AlwaysFailTransform {
message: "transform denied row".to_string(),
}),
Arc::new(IdempotentInMemorySink::new("sink", "id")),
Arc::new(InMemoryCheckpointStore::default()),
transform_failure_dead_letters.clone(),
Arc::new(InMemoryPipelineEventStore::default()),
);
let transform_failed = transform_failure_runtime
.execute_manual(QueryContext::default(), PipelineControl::default(), None)
.unwrap();
assert_eq!(transform_failed.status, PipelineRunStatus::Failed);
assert_eq!(transform_failed.rows_dead_letter, 2);
assert_eq!(transform_failed.last_checkpoint.as_deref(), Some("2"));
assert!(transform_failed
.errors
.iter()
.any(|error| error.contains("transform denied row")));
let transform_snapshot = transform_failure_runtime
.operation_snapshot(&transform_failed)
.unwrap();
assert_eq!(transform_snapshot.dead_letters.len(), 2);
assert!(transform_snapshot
.dead_letters
.iter()
.all(|dead_letter| dead_letter.stage == PipelineStage::Transform));
assert_eq!(
transform_failure_dead_letters
.list_dead_letters(&transform_failed.id)
.unwrap()
.len(),
2
);
let sink_failure_source = Arc::new(SequenceSource::new(
"sequence",
[Ok(PipelineBatch {
records: vec![
row(&[("id", json!(10)), ("tenant", json!("west"))]),
row(&[("id", json!(11)), ("tenant", json!("east"))]),
],
next_checkpoint: Some("2".to_string()),
has_more: false,
lineage: Some("sink-failure".to_string()),
source_freshness_unix_ms: Some(now_unix_ms()),
})],
));
let sink_failure_runtime = PipelineRuntime::new(
Pipeline::new(
"pipeline-sink-failure",
"Pipeline Sink Failure",
"sequence",
"passthrough",
"always-fail",
),
sink_failure_source,
Arc::new(PassthroughTransform),
Arc::new(AlwaysFailSink {
sink_name: "always-fail".to_string(),
message: "sink rejected rows".to_string(),
}),
Arc::new(InMemoryCheckpointStore::default()),
Arc::new(InMemoryDeadLetterStore::default()),
Arc::new(InMemoryPipelineEventStore::default()),
);
let sink_run = sink_failure_runtime
.execute_manual(QueryContext::default(), PipelineControl::default(), None)
.unwrap();
assert_eq!(sink_run.status, PipelineRunStatus::Succeeded);
assert_eq!(sink_run.rows_written, 0);
assert_eq!(sink_run.rows_dead_letter, 2);
assert!(sink_run
.errors
.iter()
.any(|error| error.contains("sink rejected rows")));
let sink_snapshot = sink_failure_runtime.operation_snapshot(&sink_run).unwrap();
assert_eq!(sink_snapshot.dead_letters.len(), 2);
assert!(sink_snapshot
.dead_letters
.iter()
.all(|dead_letter| dead_letter.stage == PipelineStage::Sink));
}
}