Skip to main content

shelly_data/
pipeline.rs

1use crate::{
2    integrations::{
3        map_integration_result, run_with_contract, AdapterCallContract, BigQueryAdapter,
4        ClickHouseAdapter, DataWindowRequest, OpenSearchAdapter, QueryContext, RetryPolicy,
5        SearchRequest, SingleStoreAdapter, SqlCommand,
6    },
7    DataError, DataResult, Row, WireFormatProfile,
8};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::{
12    collections::{BTreeMap, HashMap, HashSet},
13    fs::{self, OpenOptions},
14    io::{BufRead, BufReader, Write},
15    path::{Path, PathBuf},
16    sync::{
17        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
18        Arc, Mutex,
19    },
20    thread,
21    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum PipelineTrigger {
27    Manual,
28    Scheduled,
29    EventDriven,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct Pipeline {
34    pub id: String,
35    pub name: String,
36    pub source: String,
37    pub transform: String,
38    pub sink: String,
39    pub trigger: PipelineTrigger,
40    pub batch_size: usize,
41    pub max_batches_per_run: Option<usize>,
42    pub deadline_ms: u64,
43    pub retry_policy: RetryPolicy,
44    pub metadata: BTreeMap<String, String>,
45}
46
47impl Pipeline {
48    pub fn new(
49        id: impl Into<String>,
50        name: impl Into<String>,
51        source: impl Into<String>,
52        transform: impl Into<String>,
53        sink: impl Into<String>,
54    ) -> Self {
55        Self {
56            id: id.into(),
57            name: name.into(),
58            source: source.into(),
59            transform: transform.into(),
60            sink: sink.into(),
61            trigger: PipelineTrigger::Manual,
62            batch_size: 500,
63            max_batches_per_run: None,
64            deadline_ms: 30_000,
65            retry_policy: RetryPolicy::conservative(),
66            metadata: BTreeMap::new(),
67        }
68    }
69
70    pub fn with_trigger(mut self, trigger: PipelineTrigger) -> Self {
71        self.trigger = trigger;
72        self
73    }
74
75    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
76        self.batch_size = batch_size.max(1);
77        self
78    }
79
80    pub fn with_max_batches_per_run(mut self, max_batches_per_run: Option<usize>) -> Self {
81        self.max_batches_per_run = max_batches_per_run.map(|value| value.max(1));
82        self
83    }
84
85    pub fn with_deadline_ms(mut self, deadline_ms: u64) -> Self {
86        self.deadline_ms = deadline_ms.max(1);
87        self
88    }
89
90    pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
91        self.retry_policy = RetryPolicy {
92            max_attempts: retry_policy.max_attempts.max(1),
93            initial_backoff_ms: retry_policy.initial_backoff_ms,
94            max_backoff_ms: retry_policy
95                .max_backoff_ms
96                .max(retry_policy.initial_backoff_ms),
97        };
98        self
99    }
100
101    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
102        self.metadata.insert(key.into(), value.into());
103        self
104    }
105}
106
107#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
108#[serde(tag = "kind", rename_all = "snake_case")]
109pub enum PipelineRunCommand {
110    Manual { requested_by: Option<String> },
111    Scheduled { schedule_id: String },
112    Event { event: String, payload: Value },
113}
114
115impl PipelineRunCommand {
116    fn as_label(&self) -> &'static str {
117        match self {
118            Self::Manual { .. } => "manual",
119            Self::Scheduled { .. } => "scheduled",
120            Self::Event { .. } => "event",
121        }
122    }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub enum PipelineRunStatus {
128    Queued,
129    Running,
130    Paused,
131    Succeeded,
132    Failed,
133    Canceled,
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
137#[serde(rename_all = "snake_case")]
138pub enum PipelineStage {
139    Source,
140    Transform,
141    Sink,
142}
143
144#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
145pub struct PipelineRun {
146    pub id: String,
147    pub pipeline_id: String,
148    pub command: PipelineRunCommand,
149    pub status: PipelineRunStatus,
150    pub tenant_id: Option<String>,
151    pub trace_id: Option<String>,
152    pub started_at_unix_ms: u64,
153    pub finished_at_unix_ms: Option<u64>,
154    pub resumed_from_checkpoint: Option<String>,
155    pub last_checkpoint: Option<String>,
156    pub rows_read: u64,
157    pub rows_transformed: u64,
158    pub rows_written: u64,
159    pub rows_dead_letter: u64,
160    pub duplicate_writes: u64,
161    pub retries: u64,
162    pub latest_lineage: Option<String>,
163    pub latest_freshness_lag_ms: Option<u64>,
164    pub errors: Vec<String>,
165}
166
167impl PipelineRun {
168    fn new(
169        id: String,
170        pipeline_id: String,
171        command: PipelineRunCommand,
172        context: &QueryContext,
173    ) -> Self {
174        Self {
175            id,
176            pipeline_id,
177            command,
178            status: PipelineRunStatus::Queued,
179            tenant_id: context.tenant_id.clone(),
180            trace_id: context.trace_id.clone(),
181            started_at_unix_ms: now_unix_ms(),
182            finished_at_unix_ms: None,
183            resumed_from_checkpoint: None,
184            last_checkpoint: None,
185            rows_read: 0,
186            rows_transformed: 0,
187            rows_written: 0,
188            rows_dead_letter: 0,
189            duplicate_writes: 0,
190            retries: 0,
191            latest_lineage: None,
192            latest_freshness_lag_ms: None,
193            errors: Vec::new(),
194        }
195    }
196
197    fn finish(&mut self, status: PipelineRunStatus) {
198        self.status = status;
199        self.finished_at_unix_ms = Some(now_unix_ms());
200    }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(rename_all = "snake_case")]
205pub enum PipelineEventKind {
206    RunQueued,
207    RunStarted,
208    RunResumed,
209    BatchRead,
210    BatchTransformed,
211    BatchWritten,
212    CheckpointSaved,
213    Retrying,
214    DeadLettered,
215    RunPaused,
216    RunCanceled,
217    RunSucceeded,
218    RunFailed,
219}
220
221#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
222pub struct PipelineEvent {
223    pub run_id: String,
224    pub pipeline_id: String,
225    pub timestamp_unix_ms: u64,
226    pub kind: PipelineEventKind,
227    pub message: String,
228    pub tenant_id: Option<String>,
229    pub trace_id: Option<String>,
230    pub correlation_id: Option<String>,
231    pub request_id: Option<String>,
232    pub metrics: BTreeMap<String, u64>,
233    pub metadata: BTreeMap<String, String>,
234}
235
236#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct PipelineBatch {
238    pub records: Vec<Row>,
239    pub next_checkpoint: Option<String>,
240    pub has_more: bool,
241    pub lineage: Option<String>,
242    pub source_freshness_unix_ms: Option<u64>,
243}
244
245impl PipelineBatch {
246    pub fn empty() -> Self {
247        Self {
248            records: Vec::new(),
249            next_checkpoint: None,
250            has_more: false,
251            lineage: None,
252            source_freshness_unix_ms: None,
253        }
254    }
255}
256
257#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
258pub struct PipelineDeadLetterInput {
259    pub stage: PipelineStage,
260    pub reason: String,
261    pub record: Row,
262}
263
264#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
265pub struct TransformBatch {
266    pub records: Vec<Row>,
267    pub dead_letters: Vec<PipelineDeadLetterInput>,
268}
269
270impl TransformBatch {
271    pub fn passthrough(records: Vec<Row>) -> Self {
272        Self {
273            records,
274            dead_letters: Vec::new(),
275        }
276    }
277}
278
279#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
280pub struct SinkWriteResult {
281    pub written: usize,
282    pub duplicate_writes: usize,
283    pub dead_letters: Vec<PipelineDeadLetterInput>,
284}
285
286#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
287pub struct PipelineDeadLetter {
288    pub run_id: String,
289    pub pipeline_id: String,
290    pub stage: PipelineStage,
291    pub reason: String,
292    pub record: Row,
293    pub checkpoint: Option<String>,
294    pub attempt: u32,
295    pub timestamp_unix_ms: u64,
296}
297
298pub trait Source: Send + Sync {
299    fn source_name(&self) -> &str;
300    fn read_batch(
301        &self,
302        pipeline: &Pipeline,
303        checkpoint: Option<&str>,
304        context: &QueryContext,
305    ) -> DataResult<PipelineBatch>;
306}
307
308pub trait Transform: Send + Sync {
309    fn transform_name(&self) -> &str;
310    fn transform_batch(
311        &self,
312        pipeline: &Pipeline,
313        run: &PipelineRun,
314        records: Vec<Row>,
315        context: &QueryContext,
316    ) -> DataResult<TransformBatch>;
317}
318
319#[derive(Debug, Clone, Copy, Default)]
320pub struct PassthroughTransform;
321
322impl Transform for PassthroughTransform {
323    fn transform_name(&self) -> &str {
324        "passthrough"
325    }
326
327    fn transform_batch(
328        &self,
329        _pipeline: &Pipeline,
330        _run: &PipelineRun,
331        records: Vec<Row>,
332        _context: &QueryContext,
333    ) -> DataResult<TransformBatch> {
334        Ok(TransformBatch::passthrough(records))
335    }
336}
337
338pub trait Sink: Send + Sync {
339    fn sink_name(&self) -> &str;
340    fn write_batch(
341        &self,
342        pipeline: &Pipeline,
343        run: &PipelineRun,
344        records: &[Row],
345        context: &QueryContext,
346    ) -> DataResult<SinkWriteResult>;
347}
348
349pub trait CheckpointStore: Send + Sync {
350    fn load_checkpoint(
351        &self,
352        pipeline_id: &str,
353        tenant_id: Option<&str>,
354    ) -> DataResult<Option<String>>;
355    fn save_checkpoint(
356        &self,
357        pipeline_id: &str,
358        tenant_id: Option<&str>,
359        checkpoint: &str,
360    ) -> DataResult<()>;
361    fn clear_checkpoint(&self, pipeline_id: &str, tenant_id: Option<&str>) -> DataResult<()>;
362}
363
364pub trait DeadLetterStore: Send + Sync {
365    fn push_dead_letter(&self, dead_letter: PipelineDeadLetter) -> DataResult<()>;
366    fn list_dead_letters(&self, run_id: &str) -> DataResult<Vec<PipelineDeadLetter>>;
367}
368
369pub trait PipelineEventStore: Send + Sync {
370    fn append_event(&self, event: PipelineEvent) -> DataResult<()>;
371    fn list_events(&self, run_id: &str) -> DataResult<Vec<PipelineEvent>>;
372}
373
374#[derive(Debug, Clone, Default)]
375pub struct InMemoryCheckpointStore {
376    inner: Arc<Mutex<HashMap<String, String>>>,
377}
378
379impl InMemoryCheckpointStore {
380    fn key(pipeline_id: &str, tenant_id: Option<&str>) -> String {
381        format!("{}::{}", tenant_id.unwrap_or("-"), pipeline_id)
382    }
383}
384
385impl CheckpointStore for InMemoryCheckpointStore {
386    fn load_checkpoint(
387        &self,
388        pipeline_id: &str,
389        tenant_id: Option<&str>,
390    ) -> DataResult<Option<String>> {
391        let key = Self::key(pipeline_id, tenant_id);
392        let guard = self
393            .inner
394            .lock()
395            .map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
396        Ok(guard.get(&key).cloned())
397    }
398
399    fn save_checkpoint(
400        &self,
401        pipeline_id: &str,
402        tenant_id: Option<&str>,
403        checkpoint: &str,
404    ) -> DataResult<()> {
405        let key = Self::key(pipeline_id, tenant_id);
406        let mut guard = self
407            .inner
408            .lock()
409            .map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
410        guard.insert(key, checkpoint.to_string());
411        Ok(())
412    }
413
414    fn clear_checkpoint(&self, pipeline_id: &str, tenant_id: Option<&str>) -> DataResult<()> {
415        let key = Self::key(pipeline_id, tenant_id);
416        let mut guard = self
417            .inner
418            .lock()
419            .map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
420        guard.remove(&key);
421        Ok(())
422    }
423}
424
425#[derive(Debug, Clone, Default)]
426pub struct InMemoryDeadLetterStore {
427    inner: Arc<Mutex<Vec<PipelineDeadLetter>>>,
428}
429
430impl DeadLetterStore for InMemoryDeadLetterStore {
431    fn push_dead_letter(&self, dead_letter: PipelineDeadLetter) -> DataResult<()> {
432        let mut guard = self
433            .inner
434            .lock()
435            .map_err(|_| DataError::Integration("dead-letter store lock poisoned".to_string()))?;
436        guard.push(dead_letter);
437        Ok(())
438    }
439
440    fn list_dead_letters(&self, run_id: &str) -> DataResult<Vec<PipelineDeadLetter>> {
441        let guard = self
442            .inner
443            .lock()
444            .map_err(|_| DataError::Integration("dead-letter store lock poisoned".to_string()))?;
445        Ok(guard
446            .iter()
447            .filter(|entry| entry.run_id == run_id)
448            .cloned()
449            .collect())
450    }
451}
452
453#[derive(Debug, Clone, Default)]
454pub struct InMemoryPipelineEventStore {
455    inner: Arc<Mutex<Vec<PipelineEvent>>>,
456}
457
458impl PipelineEventStore for InMemoryPipelineEventStore {
459    fn append_event(&self, event: PipelineEvent) -> DataResult<()> {
460        let mut guard = self.inner.lock().map_err(|_| {
461            DataError::Integration("pipeline event store lock poisoned".to_string())
462        })?;
463        guard.push(event);
464        Ok(())
465    }
466
467    fn list_events(&self, run_id: &str) -> DataResult<Vec<PipelineEvent>> {
468        let guard = self.inner.lock().map_err(|_| {
469            DataError::Integration("pipeline event store lock poisoned".to_string())
470        })?;
471        Ok(guard
472            .iter()
473            .filter(|event| event.run_id == run_id)
474            .cloned()
475            .collect())
476    }
477}
478
479#[derive(Debug, Clone, Default)]
480pub struct PipelineControl {
481    paused: Arc<AtomicBool>,
482    canceled: Arc<AtomicBool>,
483}
484
485impl PipelineControl {
486    pub fn pause(&self) {
487        self.paused.store(true, Ordering::SeqCst);
488    }
489
490    pub fn resume(&self) {
491        self.paused.store(false, Ordering::SeqCst);
492    }
493
494    pub fn cancel(&self) {
495        self.canceled.store(true, Ordering::SeqCst);
496    }
497
498    pub fn is_paused(&self) -> bool {
499        self.paused.load(Ordering::SeqCst)
500    }
501
502    pub fn is_canceled(&self) -> bool {
503        self.canceled.load(Ordering::SeqCst)
504    }
505}
506
507pub struct PipelineRuntime {
508    pipeline: Pipeline,
509    source: Arc<dyn Source>,
510    transform: Arc<dyn Transform>,
511    sink: Arc<dyn Sink>,
512    checkpoint_store: Arc<dyn CheckpointStore>,
513    dead_letter_store: Arc<dyn DeadLetterStore>,
514    event_store: Arc<dyn PipelineEventStore>,
515}
516
517impl PipelineRuntime {
518    pub fn new(
519        pipeline: Pipeline,
520        source: Arc<dyn Source>,
521        transform: Arc<dyn Transform>,
522        sink: Arc<dyn Sink>,
523        checkpoint_store: Arc<dyn CheckpointStore>,
524        dead_letter_store: Arc<dyn DeadLetterStore>,
525        event_store: Arc<dyn PipelineEventStore>,
526    ) -> Self {
527        Self {
528            pipeline,
529            source,
530            transform,
531            sink,
532            checkpoint_store,
533            dead_letter_store,
534            event_store,
535        }
536    }
537
538    pub fn execute_manual(
539        &self,
540        context: QueryContext,
541        control: PipelineControl,
542        requested_by: Option<String>,
543    ) -> DataResult<PipelineRun> {
544        self.execute(
545            PipelineRunCommand::Manual { requested_by },
546            context,
547            control,
548        )
549    }
550
551    pub fn execute_scheduled(
552        &self,
553        context: QueryContext,
554        control: PipelineControl,
555        schedule_id: impl Into<String>,
556    ) -> DataResult<PipelineRun> {
557        self.execute(
558            PipelineRunCommand::Scheduled {
559                schedule_id: schedule_id.into(),
560            },
561            context,
562            control,
563        )
564    }
565
566    pub fn execute_event(
567        &self,
568        context: QueryContext,
569        control: PipelineControl,
570        event: impl Into<String>,
571        payload: Value,
572    ) -> DataResult<PipelineRun> {
573        self.execute(
574            PipelineRunCommand::Event {
575                event: event.into(),
576                payload,
577            },
578            context,
579            control,
580        )
581    }
582
583    pub fn execute(
584        &self,
585        command: PipelineRunCommand,
586        context: QueryContext,
587        control: PipelineControl,
588    ) -> DataResult<PipelineRun> {
589        let started = Instant::now();
590        let run_id = format!("plr-{}-{}", now_unix_ms(), next_counter());
591        let mut run = PipelineRun::new(
592            run_id.clone(),
593            self.pipeline.id.clone(),
594            command.clone(),
595            &context,
596        );
597        self.emit_event(
598            &run,
599            &context,
600            PipelineEventKind::RunQueued,
601            "pipeline run queued",
602            BTreeMap::new(),
603            BTreeMap::new(),
604        )?;
605
606        run.status = PipelineRunStatus::Running;
607        self.emit_event(
608            &run,
609            &context,
610            PipelineEventKind::RunStarted,
611            "pipeline run started",
612            BTreeMap::new(),
613            BTreeMap::from([("command".to_string(), command.as_label().to_string())]),
614        )?;
615
616        let tenant_id = context.tenant_id.as_deref();
617        let mut checkpoint = self
618            .checkpoint_store
619            .load_checkpoint(&self.pipeline.id, tenant_id)?;
620        if checkpoint.is_some() {
621            run.resumed_from_checkpoint = checkpoint.clone();
622            self.emit_event(
623                &run,
624                &context,
625                PipelineEventKind::RunResumed,
626                "resuming from stored checkpoint",
627                BTreeMap::new(),
628                BTreeMap::from([(
629                    "checkpoint".to_string(),
630                    checkpoint.clone().unwrap_or_default(),
631                )]),
632            )?;
633        }
634
635        let mut batches_processed = 0usize;
636        loop {
637            if control.is_canceled() {
638                run.finish(PipelineRunStatus::Canceled);
639                self.emit_event(
640                    &run,
641                    &context,
642                    PipelineEventKind::RunCanceled,
643                    "pipeline run canceled by operator control",
644                    BTreeMap::new(),
645                    BTreeMap::new(),
646                )?;
647                break;
648            }
649            if control.is_paused() {
650                run.finish(PipelineRunStatus::Paused);
651                self.emit_event(
652                    &run,
653                    &context,
654                    PipelineEventKind::RunPaused,
655                    "pipeline run paused by operator control",
656                    BTreeMap::new(),
657                    BTreeMap::new(),
658                )?;
659                break;
660            }
661            if started.elapsed() > Duration::from_millis(self.pipeline.deadline_ms.max(1)) {
662                let message = format!(
663                    "pipeline run deadline exceeded ({}ms)",
664                    self.pipeline.deadline_ms
665                );
666                run.errors.push(message.clone());
667                run.finish(PipelineRunStatus::Failed);
668                self.emit_event(
669                    &run,
670                    &context,
671                    PipelineEventKind::RunFailed,
672                    &message,
673                    BTreeMap::new(),
674                    BTreeMap::new(),
675                )?;
676                break;
677            }
678            if let Some(max_batches) = self.pipeline.max_batches_per_run {
679                if batches_processed >= max_batches {
680                    run.finish(PipelineRunStatus::Paused);
681                    self.emit_event(
682                        &run,
683                        &context,
684                        PipelineEventKind::RunPaused,
685                        "pipeline run paused at max_batches_per_run budget",
686                        BTreeMap::from([("max_batches".to_string(), max_batches as u64)]),
687                        BTreeMap::new(),
688                    )?;
689                    break;
690                }
691            }
692
693            let source_batch =
694                match self.with_retry(PipelineStage::Source, &context, &mut run, |attempt| {
695                    self.source
696                        .read_batch(&self.pipeline, checkpoint.as_deref(), &context)
697                        .map_err(|err| {
698                            DataError::Integration(format!(
699                                "source read failed at attempt {}: {}",
700                                attempt, err
701                            ))
702                        })
703                }) {
704                    Ok(batch) => batch,
705                    Err(err) => {
706                        let message = err.to_string();
707                        run.errors.push(message.clone());
708                        run.finish(PipelineRunStatus::Failed);
709                        self.emit_event(
710                            &run,
711                            &context,
712                            PipelineEventKind::RunFailed,
713                            &message,
714                            BTreeMap::new(),
715                            BTreeMap::new(),
716                        )?;
717                        break;
718                    }
719                };
720
721            let read_count = source_batch.records.len() as u64;
722            run.rows_read = run.rows_read.saturating_add(read_count);
723            if let Some(lineage) = source_batch.lineage.as_ref() {
724                run.latest_lineage = Some(lineage.clone());
725            }
726            if let Some(source_freshness) = source_batch.source_freshness_unix_ms {
727                run.latest_freshness_lag_ms = Some(now_unix_ms().saturating_sub(source_freshness));
728            }
729            self.emit_event(
730                &run,
731                &context,
732                PipelineEventKind::BatchRead,
733                "source batch read complete",
734                BTreeMap::from([("rows".to_string(), read_count)]),
735                BTreeMap::from([
736                    (
737                        "checkpoint".to_string(),
738                        checkpoint.clone().unwrap_or_else(|| "none".to_string()),
739                    ),
740                    ("has_more".to_string(), source_batch.has_more.to_string()),
741                ]),
742            )?;
743
744            if source_batch.records.is_empty() && !source_batch.has_more {
745                run.finish(PipelineRunStatus::Succeeded);
746                self.emit_event(
747                    &run,
748                    &context,
749                    PipelineEventKind::RunSucceeded,
750                    "pipeline run completed with no additional source records",
751                    BTreeMap::new(),
752                    BTreeMap::new(),
753                )?;
754                break;
755            }
756
757            let run_snapshot = run.clone();
758            let transformed =
759                match self.with_retry(PipelineStage::Transform, &context, &mut run, |_attempt| {
760                    self.transform.transform_batch(
761                        &self.pipeline,
762                        &run_snapshot,
763                        source_batch.records.clone(),
764                        &context,
765                    )
766                }) {
767                    Ok(batch) => batch,
768                    Err(err) => {
769                        let reason = err.to_string();
770                        for record in source_batch.records {
771                            self.push_dead_letter(
772                                &mut run,
773                                PipelineDeadLetterInput {
774                                    stage: PipelineStage::Transform,
775                                    reason: reason.clone(),
776                                    record,
777                                },
778                                checkpoint.clone(),
779                                1,
780                            )?;
781                        }
782                        run.errors.push(reason.clone());
783                        if let Some(next_checkpoint) = source_batch.next_checkpoint.clone() {
784                            self.checkpoint_store.save_checkpoint(
785                                &self.pipeline.id,
786                                tenant_id,
787                                &next_checkpoint,
788                            )?;
789                            run.last_checkpoint = Some(next_checkpoint.clone());
790                            checkpoint = Some(next_checkpoint.clone());
791                            self.emit_event(
792                                &run,
793                                &context,
794                                PipelineEventKind::CheckpointSaved,
795                                "checkpoint advanced after transform dead-letter fallback",
796                                BTreeMap::new(),
797                                BTreeMap::from([("checkpoint".to_string(), next_checkpoint)]),
798                            )?;
799                        }
800                        if source_batch.has_more {
801                            batches_processed += 1;
802                            continue;
803                        }
804                        run.finish(PipelineRunStatus::Failed);
805                        self.emit_event(
806                            &run,
807                            &context,
808                            PipelineEventKind::RunFailed,
809                            "transform stage exhausted and no additional source batches remain",
810                            BTreeMap::new(),
811                            BTreeMap::new(),
812                        )?;
813                        break;
814                    }
815                };
816
817            let transformed_count = transformed.records.len() as u64;
818            run.rows_transformed = run.rows_transformed.saturating_add(transformed_count);
819            self.emit_event(
820                &run,
821                &context,
822                PipelineEventKind::BatchTransformed,
823                "transform batch complete",
824                BTreeMap::from([("rows".to_string(), transformed_count)]),
825                BTreeMap::new(),
826            )?;
827
828            for dead_letter in transformed.dead_letters {
829                self.push_dead_letter(&mut run, dead_letter, checkpoint.clone(), 1)?;
830            }
831
832            let write_result = if transformed.records.is_empty() {
833                SinkWriteResult::default()
834            } else {
835                let run_snapshot = run.clone();
836                match self.with_retry(PipelineStage::Sink, &context, &mut run, |_attempt| {
837                    self.sink.write_batch(
838                        &self.pipeline,
839                        &run_snapshot,
840                        &transformed.records,
841                        &context,
842                    )
843                }) {
844                    Ok(result) => result,
845                    Err(err) => {
846                        let reason = err.to_string();
847                        for record in transformed.records {
848                            self.push_dead_letter(
849                                &mut run,
850                                PipelineDeadLetterInput {
851                                    stage: PipelineStage::Sink,
852                                    reason: reason.clone(),
853                                    record,
854                                },
855                                checkpoint.clone(),
856                                1,
857                            )?;
858                        }
859                        run.errors.push(reason);
860                        SinkWriteResult::default()
861                    }
862                }
863            };
864
865            run.rows_written = run.rows_written.saturating_add(write_result.written as u64);
866            run.duplicate_writes = run
867                .duplicate_writes
868                .saturating_add(write_result.duplicate_writes as u64);
869            self.emit_event(
870                &run,
871                &context,
872                PipelineEventKind::BatchWritten,
873                "sink batch write complete",
874                BTreeMap::from([
875                    ("written".to_string(), write_result.written as u64),
876                    (
877                        "duplicate_writes".to_string(),
878                        write_result.duplicate_writes as u64,
879                    ),
880                ]),
881                BTreeMap::new(),
882            )?;
883
884            for dead_letter in write_result.dead_letters {
885                self.push_dead_letter(&mut run, dead_letter, checkpoint.clone(), 1)?;
886            }
887
888            if let Some(next_checkpoint) = source_batch.next_checkpoint.clone() {
889                self.checkpoint_store.save_checkpoint(
890                    &self.pipeline.id,
891                    tenant_id,
892                    &next_checkpoint,
893                )?;
894                run.last_checkpoint = Some(next_checkpoint.clone());
895                checkpoint = Some(next_checkpoint.clone());
896                self.emit_event(
897                    &run,
898                    &context,
899                    PipelineEventKind::CheckpointSaved,
900                    "checkpoint saved",
901                    BTreeMap::new(),
902                    BTreeMap::from([("checkpoint".to_string(), next_checkpoint)]),
903                )?;
904            }
905
906            batches_processed = batches_processed.saturating_add(1);
907
908            if !source_batch.has_more {
909                run.finish(PipelineRunStatus::Succeeded);
910                self.emit_event(
911                    &run,
912                    &context,
913                    PipelineEventKind::RunSucceeded,
914                    "pipeline run completed successfully",
915                    BTreeMap::new(),
916                    BTreeMap::new(),
917                )?;
918                break;
919            }
920        }
921
922        if run.finished_at_unix_ms.is_none() {
923            run.finished_at_unix_ms = Some(now_unix_ms());
924        }
925
926        Ok(run)
927    }
928
929    pub fn operation_snapshot(&self, run: &PipelineRun) -> DataResult<PipelineOperationSnapshot> {
930        let events = self.event_store.list_events(&run.id)?;
931        let dead_letters = self.dead_letter_store.list_dead_letters(&run.id)?;
932        Ok(PipelineOperationSnapshot {
933            run: run.clone(),
934            events,
935            dead_letters,
936        })
937    }
938
939    fn with_retry<T, F>(
940        &self,
941        stage: PipelineStage,
942        context: &QueryContext,
943        run: &mut PipelineRun,
944        mut operation: F,
945    ) -> DataResult<T>
946    where
947        F: FnMut(u32) -> DataResult<T>,
948    {
949        let attempts = self.pipeline.retry_policy.max_attempts.max(1);
950        let mut backoff_ms = self.pipeline.retry_policy.initial_backoff_ms;
951
952        for attempt in 1..=attempts {
953            match operation(attempt) {
954                Ok(value) => return Ok(value),
955                Err(err) if attempt == attempts || !is_retryable_pipeline_error(&err) => {
956                    return Err(err);
957                }
958                Err(err) => {
959                    run.retries = run.retries.saturating_add(1);
960                    self.emit_event(
961                        run,
962                        context,
963                        PipelineEventKind::Retrying,
964                        "retrying pipeline stage after retryable error",
965                        BTreeMap::from([
966                            ("attempt".to_string(), attempt as u64),
967                            ("max_attempts".to_string(), attempts as u64),
968                            ("backoff_ms".to_string(), backoff_ms),
969                        ]),
970                        BTreeMap::from([
971                            (
972                                "stage".to_string(),
973                                format!("{stage:?}").to_ascii_lowercase(),
974                            ),
975                            ("error".to_string(), err.to_string()),
976                        ]),
977                    )?;
978                    backoff_ms = (backoff_ms.saturating_mul(2))
979                        .min(self.pipeline.retry_policy.max_backoff_ms.max(1));
980                }
981            }
982        }
983        Err(DataError::Integration(
984            "retry exhausted without terminal error detail".to_string(),
985        ))
986    }
987
988    fn emit_event(
989        &self,
990        run: &PipelineRun,
991        context: &QueryContext,
992        kind: PipelineEventKind,
993        message: &str,
994        metrics: BTreeMap<String, u64>,
995        metadata: BTreeMap<String, String>,
996    ) -> DataResult<()> {
997        self.event_store.append_event(PipelineEvent {
998            run_id: run.id.clone(),
999            pipeline_id: run.pipeline_id.clone(),
1000            timestamp_unix_ms: now_unix_ms(),
1001            kind,
1002            message: message.to_string(),
1003            tenant_id: run.tenant_id.clone(),
1004            trace_id: run.trace_id.clone(),
1005            correlation_id: context.correlation_id().map(ToString::to_string),
1006            request_id: context.request_id().map(ToString::to_string),
1007            metrics,
1008            metadata,
1009        })
1010    }
1011
1012    fn push_dead_letter(
1013        &self,
1014        run: &mut PipelineRun,
1015        input: PipelineDeadLetterInput,
1016        checkpoint: Option<String>,
1017        attempt: u32,
1018    ) -> DataResult<()> {
1019        run.rows_dead_letter = run.rows_dead_letter.saturating_add(1);
1020        self.dead_letter_store.push_dead_letter(PipelineDeadLetter {
1021            run_id: run.id.clone(),
1022            pipeline_id: run.pipeline_id.clone(),
1023            stage: input.stage,
1024            reason: input.reason,
1025            record: input.record,
1026            checkpoint,
1027            attempt,
1028            timestamp_unix_ms: now_unix_ms(),
1029        })
1030    }
1031}
1032
1033#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1034pub struct PipelineOperationSnapshot {
1035    pub run: PipelineRun,
1036    pub events: Vec<PipelineEvent>,
1037    pub dead_letters: Vec<PipelineDeadLetter>,
1038}
1039
1040#[derive(Debug, Clone)]
1041pub struct InMemoryRecordSource {
1042    source_name: String,
1043    records: Vec<Row>,
1044    lineage: Option<String>,
1045    freshness_unix_ms: Option<u64>,
1046}
1047
1048impl InMemoryRecordSource {
1049    pub fn new(source_name: impl Into<String>, records: Vec<Row>) -> Self {
1050        Self {
1051            source_name: source_name.into(),
1052            records,
1053            lineage: None,
1054            freshness_unix_ms: Some(now_unix_ms()),
1055        }
1056    }
1057
1058    pub fn with_lineage(mut self, lineage: impl Into<String>) -> Self {
1059        self.lineage = Some(lineage.into());
1060        self
1061    }
1062
1063    pub fn with_freshness_unix_ms(mut self, freshness_unix_ms: u64) -> Self {
1064        self.freshness_unix_ms = Some(freshness_unix_ms);
1065        self
1066    }
1067}
1068
1069impl Source for InMemoryRecordSource {
1070    fn source_name(&self) -> &str {
1071        &self.source_name
1072    }
1073
1074    fn read_batch(
1075        &self,
1076        pipeline: &Pipeline,
1077        checkpoint: Option<&str>,
1078        _context: &QueryContext,
1079    ) -> DataResult<PipelineBatch> {
1080        let offset = parse_checkpoint_offset(checkpoint);
1081        let limit = pipeline.batch_size.max(1);
1082        let total_rows = self.records.len();
1083        let rows = self
1084            .records
1085            .iter()
1086            .skip(offset)
1087            .take(limit)
1088            .cloned()
1089            .collect::<Vec<_>>();
1090        let next_offset = offset.saturating_add(rows.len());
1091        Ok(PipelineBatch {
1092            records: rows,
1093            next_checkpoint: Some(next_offset.to_string()),
1094            has_more: next_offset < total_rows,
1095            lineage: self.lineage.clone(),
1096            source_freshness_unix_ms: self.freshness_unix_ms,
1097        })
1098    }
1099}
1100
1101#[derive(Debug, Clone)]
1102pub struct FileJsonLineSource {
1103    source_name: String,
1104    path: PathBuf,
1105}
1106
1107impl FileJsonLineSource {
1108    pub fn new(source_name: impl Into<String>, path: impl Into<PathBuf>) -> Self {
1109        Self {
1110            source_name: source_name.into(),
1111            path: path.into(),
1112        }
1113    }
1114}
1115
1116impl Source for FileJsonLineSource {
1117    fn source_name(&self) -> &str {
1118        &self.source_name
1119    }
1120
1121    fn read_batch(
1122        &self,
1123        pipeline: &Pipeline,
1124        checkpoint: Option<&str>,
1125        _context: &QueryContext,
1126    ) -> DataResult<PipelineBatch> {
1127        if !self.path.exists() {
1128            return Ok(PipelineBatch {
1129                records: Vec::new(),
1130                next_checkpoint: Some(parse_checkpoint_offset(checkpoint).to_string()),
1131                has_more: false,
1132                lineage: Some(format!("file://{}", self.path.display())),
1133                source_freshness_unix_ms: None,
1134            });
1135        }
1136
1137        let file = OpenOptions::new().read(true).open(&self.path)?;
1138        let reader = BufReader::new(file);
1139        let mut records = Vec::new();
1140        for line in reader.lines() {
1141            let line = line?;
1142            if line.trim().is_empty() {
1143                continue;
1144            }
1145            let value: Value = serde_json::from_str(&line)?;
1146            let Value::Object(map) = value else {
1147                return Err(DataError::Integration(format!(
1148                    "file source expected JSON object row in {}",
1149                    self.path.display()
1150                )));
1151            };
1152            records.push(BTreeMap::from_iter(map));
1153        }
1154
1155        let offset = parse_checkpoint_offset(checkpoint);
1156        let limit = pipeline.batch_size.max(1);
1157        let total_rows = records.len();
1158        let rows = records
1159            .into_iter()
1160            .skip(offset)
1161            .take(limit)
1162            .collect::<Vec<_>>();
1163        let next_offset = offset.saturating_add(rows.len());
1164        let freshness_unix_ms = fs::metadata(&self.path)
1165            .ok()
1166            .and_then(|metadata| metadata.modified().ok())
1167            .and_then(system_time_to_unix_ms);
1168        Ok(PipelineBatch {
1169            records: rows,
1170            next_checkpoint: Some(next_offset.to_string()),
1171            has_more: next_offset < total_rows,
1172            lineage: Some(format!("file://{}", self.path.display())),
1173            source_freshness_unix_ms: freshness_unix_ms,
1174        })
1175    }
1176}
1177
1178#[derive(Debug, Clone, Default)]
1179pub struct InMemoryObjectStore {
1180    inner: Arc<Mutex<HashMap<String, Vec<Value>>>>,
1181}
1182
1183impl InMemoryObjectStore {
1184    pub fn put(&self, key: &str, payload: Vec<Value>) -> DataResult<()> {
1185        let mut guard = self
1186            .inner
1187            .lock()
1188            .map_err(|_| DataError::Integration("object store lock poisoned".to_string()))?;
1189        guard.insert(key.to_string(), payload);
1190        Ok(())
1191    }
1192
1193    pub fn get(&self, key: &str) -> DataResult<Vec<Value>> {
1194        let guard = self
1195            .inner
1196            .lock()
1197            .map_err(|_| DataError::Integration("object store lock poisoned".to_string()))?;
1198        Ok(guard.get(key).cloned().unwrap_or_default())
1199    }
1200}
1201
1202#[derive(Debug, Clone)]
1203pub struct ObjectStoreJsonSource {
1204    source_name: String,
1205    object_store: InMemoryObjectStore,
1206    object_key: String,
1207}
1208
1209impl ObjectStoreJsonSource {
1210    pub fn new(
1211        source_name: impl Into<String>,
1212        object_store: InMemoryObjectStore,
1213        object_key: impl Into<String>,
1214    ) -> Self {
1215        Self {
1216            source_name: source_name.into(),
1217            object_store,
1218            object_key: object_key.into(),
1219        }
1220    }
1221}
1222
1223impl Source for ObjectStoreJsonSource {
1224    fn source_name(&self) -> &str {
1225        &self.source_name
1226    }
1227
1228    fn read_batch(
1229        &self,
1230        pipeline: &Pipeline,
1231        checkpoint: Option<&str>,
1232        _context: &QueryContext,
1233    ) -> DataResult<PipelineBatch> {
1234        let offset = parse_checkpoint_offset(checkpoint);
1235        let limit = pipeline.batch_size.max(1);
1236        let objects = self.object_store.get(&self.object_key)?;
1237        let total_rows = objects.len();
1238        let mut rows = Vec::new();
1239        for value in objects.into_iter().skip(offset).take(limit) {
1240            let Value::Object(map) = value else {
1241                return Err(DataError::Integration(format!(
1242                    "object store source expected JSON object rows for key `{}`",
1243                    self.object_key
1244                )));
1245            };
1246            rows.push(BTreeMap::from_iter(map));
1247        }
1248        let next_offset = offset.saturating_add(rows.len());
1249        Ok(PipelineBatch {
1250            records: rows,
1251            next_checkpoint: Some(next_offset.to_string()),
1252            has_more: next_offset < total_rows,
1253            lineage: Some(format!("object://{}", self.object_key)),
1254            source_freshness_unix_ms: Some(now_unix_ms()),
1255        })
1256    }
1257}
1258
1259#[derive(Debug)]
1260struct IdempotentSinkState {
1261    key_field: String,
1262    rows: Vec<Row>,
1263    seen: HashSet<String>,
1264}
1265
1266#[derive(Clone)]
1267pub struct IdempotentInMemorySink {
1268    sink_name: String,
1269    state: Arc<Mutex<IdempotentSinkState>>,
1270    probe: Option<Arc<dyn Fn(&QueryContext) -> DataResult<()> + Send + Sync>>,
1271}
1272
1273impl std::fmt::Debug for IdempotentInMemorySink {
1274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1275        f.debug_struct("IdempotentInMemorySink")
1276            .field("sink_name", &self.sink_name)
1277            .finish_non_exhaustive()
1278    }
1279}
1280
1281impl IdempotentInMemorySink {
1282    pub fn new(sink_name: impl Into<String>, key_field: impl Into<String>) -> Self {
1283        Self {
1284            sink_name: sink_name.into(),
1285            state: Arc::new(Mutex::new(IdempotentSinkState {
1286                key_field: key_field.into(),
1287                rows: Vec::new(),
1288                seen: HashSet::new(),
1289            })),
1290            probe: None,
1291        }
1292    }
1293
1294    pub fn with_probe(
1295        mut self,
1296        probe: Arc<dyn Fn(&QueryContext) -> DataResult<()> + Send + Sync>,
1297    ) -> Self {
1298        self.probe = Some(probe);
1299        self
1300    }
1301
1302    pub fn rows(&self) -> DataResult<Vec<Row>> {
1303        let guard = self
1304            .state
1305            .lock()
1306            .map_err(|_| DataError::Integration("sink state lock poisoned".to_string()))?;
1307        Ok(guard.rows.clone())
1308    }
1309
1310    fn accept_records(&self, records: &[Row]) -> DataResult<(Vec<Row>, usize)> {
1311        let mut guard = self
1312            .state
1313            .lock()
1314            .map_err(|_| DataError::Integration("sink state lock poisoned".to_string()))?;
1315        let key_field = guard.key_field.clone();
1316        let mut accepted = Vec::new();
1317        let mut duplicate_writes = 0usize;
1318        for record in records {
1319            let key = record
1320                .get(&key_field)
1321                .map(Value::to_string)
1322                .unwrap_or_else(|| serde_json::to_string(record).unwrap_or_default());
1323            if !guard.seen.insert(key) {
1324                duplicate_writes = duplicate_writes.saturating_add(1);
1325                continue;
1326            }
1327            guard.rows.push(record.clone());
1328            accepted.push(record.clone());
1329        }
1330        Ok((accepted, duplicate_writes))
1331    }
1332}
1333
1334impl Sink for IdempotentInMemorySink {
1335    fn sink_name(&self) -> &str {
1336        &self.sink_name
1337    }
1338
1339    fn write_batch(
1340        &self,
1341        _pipeline: &Pipeline,
1342        _run: &PipelineRun,
1343        records: &[Row],
1344        context: &QueryContext,
1345    ) -> DataResult<SinkWriteResult> {
1346        if let Some(probe) = &self.probe {
1347            probe(context)?;
1348        }
1349        let (accepted, duplicate_writes) = self.accept_records(records)?;
1350        Ok(SinkWriteResult {
1351            written: accepted.len(),
1352            duplicate_writes,
1353            dead_letters: Vec::new(),
1354        })
1355    }
1356}
1357
1358#[derive(Debug, Clone)]
1359pub struct FileJsonLineSink {
1360    sink_name: String,
1361    path: PathBuf,
1362    dedupe: IdempotentInMemorySink,
1363}
1364
1365impl FileJsonLineSink {
1366    pub fn new(
1367        sink_name: impl Into<String>,
1368        path: impl Into<PathBuf>,
1369        key_field: impl Into<String>,
1370    ) -> Self {
1371        Self {
1372            sink_name: sink_name.into(),
1373            path: path.into(),
1374            dedupe: IdempotentInMemorySink::new("file-jsonline-dedupe", key_field),
1375        }
1376    }
1377}
1378
1379impl Sink for FileJsonLineSink {
1380    fn sink_name(&self) -> &str {
1381        &self.sink_name
1382    }
1383
1384    fn write_batch(
1385        &self,
1386        _pipeline: &Pipeline,
1387        _run: &PipelineRun,
1388        records: &[Row],
1389        context: &QueryContext,
1390    ) -> DataResult<SinkWriteResult> {
1391        if let Some(probe) = &self.dedupe.probe {
1392            probe(context)?;
1393        }
1394        let (accepted, duplicate_writes) = self.dedupe.accept_records(records)?;
1395        if accepted.is_empty() {
1396            return Ok(SinkWriteResult {
1397                written: 0,
1398                duplicate_writes,
1399                dead_letters: Vec::new(),
1400            });
1401        }
1402
1403        let mut file = OpenOptions::new()
1404            .create(true)
1405            .append(true)
1406            .open(&self.path)?;
1407        for row in &accepted {
1408            let json = serde_json::to_string(row)?;
1409            file.write_all(json.as_bytes())?;
1410            file.write_all(b"\n")?;
1411        }
1412        Ok(SinkWriteResult {
1413            written: accepted.len(),
1414            duplicate_writes,
1415            dead_letters: Vec::new(),
1416        })
1417    }
1418}
1419
1420#[derive(Debug, Clone)]
1421pub struct ObjectStoreJsonSink {
1422    sink_name: String,
1423    object_store: InMemoryObjectStore,
1424    object_key: String,
1425    dedupe: IdempotentInMemorySink,
1426}
1427
1428impl ObjectStoreJsonSink {
1429    pub fn new(
1430        sink_name: impl Into<String>,
1431        object_store: InMemoryObjectStore,
1432        object_key: impl Into<String>,
1433        key_field: impl Into<String>,
1434    ) -> Self {
1435        Self {
1436            sink_name: sink_name.into(),
1437            object_store,
1438            object_key: object_key.into(),
1439            dedupe: IdempotentInMemorySink::new("object-json-dedupe", key_field),
1440        }
1441    }
1442}
1443
1444impl Sink for ObjectStoreJsonSink {
1445    fn sink_name(&self) -> &str {
1446        &self.sink_name
1447    }
1448
1449    fn write_batch(
1450        &self,
1451        _pipeline: &Pipeline,
1452        _run: &PipelineRun,
1453        records: &[Row],
1454        context: &QueryContext,
1455    ) -> DataResult<SinkWriteResult> {
1456        if let Some(probe) = &self.dedupe.probe {
1457            probe(context)?;
1458        }
1459        let (accepted, duplicate_writes) = self.dedupe.accept_records(records)?;
1460        if accepted.is_empty() {
1461            return Ok(SinkWriteResult {
1462                written: 0,
1463                duplicate_writes,
1464                dead_letters: Vec::new(),
1465            });
1466        }
1467
1468        let mut payload = self.object_store.get(&self.object_key)?;
1469        for row in &accepted {
1470            payload.push(serde_json::to_value(row)?);
1471        }
1472        self.object_store.put(&self.object_key, payload)?;
1473        Ok(SinkWriteResult {
1474            written: accepted.len(),
1475            duplicate_writes,
1476            dead_letters: Vec::new(),
1477        })
1478    }
1479}
1480
1481#[derive(Debug, Clone)]
1482pub struct SingleStorePipelineSource<A: SingleStoreAdapter + ?Sized> {
1483    source_name: String,
1484    adapter: Arc<A>,
1485    query: SqlCommand,
1486    dataset: String,
1487    contract: AdapterCallContract,
1488    artificial_delay_ms: u64,
1489}
1490
1491impl<A: SingleStoreAdapter + ?Sized> SingleStorePipelineSource<A> {
1492    pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
1493        Self {
1494            source_name: "singlestore".to_string(),
1495            adapter,
1496            query,
1497            dataset: dataset.into(),
1498            contract: AdapterCallContract::default_query(),
1499            artificial_delay_ms: 0,
1500        }
1501    }
1502
1503    pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1504        self.contract = contract;
1505        self
1506    }
1507
1508    pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1509        self.artificial_delay_ms = artificial_delay_ms;
1510        self
1511    }
1512}
1513
1514impl<A: SingleStoreAdapter + ?Sized + Send + Sync> Source for SingleStorePipelineSource<A> {
1515    fn source_name(&self) -> &str {
1516        &self.source_name
1517    }
1518
1519    fn read_batch(
1520        &self,
1521        pipeline: &Pipeline,
1522        checkpoint: Option<&str>,
1523        context: &QueryContext,
1524    ) -> DataResult<PipelineBatch> {
1525        let offset = parse_checkpoint_offset(checkpoint);
1526        let request = DataWindowRequest {
1527            dataset: self.dataset.clone(),
1528            offset,
1529            limit: pipeline.batch_size.max(1),
1530            query_token: None,
1531            window_token: checkpoint.map(ToString::to_string),
1532            wire_format: WireFormatProfile::Json,
1533        };
1534        let response = map_integration_result(
1535            "singlestore.pipeline_source",
1536            run_with_contract(
1537                "singlestore",
1538                "pipeline_source.read_batch",
1539                self.contract,
1540                context,
1541                |_| {
1542                    if self.artificial_delay_ms > 0 {
1543                        thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1544                    }
1545                    self.adapter.run_high_volume_window_query(
1546                        self.query.clone(),
1547                        request.clone(),
1548                        context,
1549                    )
1550                },
1551            ),
1552        )?;
1553        let next_offset = response.offset.saturating_add(response.rows.len());
1554        Ok(PipelineBatch {
1555            records: response.rows,
1556            next_checkpoint: Some(next_offset.to_string()),
1557            has_more: next_offset < response.total_rows,
1558            lineage: Some(response.query_token),
1559            source_freshness_unix_ms: Some(now_unix_ms()),
1560        })
1561    }
1562}
1563
1564#[derive(Debug, Clone)]
1565pub struct ClickHousePipelineSource<A: ClickHouseAdapter + ?Sized> {
1566    source_name: String,
1567    adapter: Arc<A>,
1568    query: SqlCommand,
1569    dataset: String,
1570    contract: AdapterCallContract,
1571    artificial_delay_ms: u64,
1572}
1573
1574impl<A: ClickHouseAdapter + ?Sized> ClickHousePipelineSource<A> {
1575    pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
1576        Self {
1577            source_name: "clickhouse".to_string(),
1578            adapter,
1579            query,
1580            dataset: dataset.into(),
1581            contract: AdapterCallContract::default_query(),
1582            artificial_delay_ms: 0,
1583        }
1584    }
1585
1586    pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1587        self.contract = contract;
1588        self
1589    }
1590
1591    pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1592        self.artificial_delay_ms = artificial_delay_ms;
1593        self
1594    }
1595}
1596
1597impl<A: ClickHouseAdapter + ?Sized + Send + Sync> Source for ClickHousePipelineSource<A> {
1598    fn source_name(&self) -> &str {
1599        &self.source_name
1600    }
1601
1602    fn read_batch(
1603        &self,
1604        pipeline: &Pipeline,
1605        checkpoint: Option<&str>,
1606        context: &QueryContext,
1607    ) -> DataResult<PipelineBatch> {
1608        let offset = parse_checkpoint_offset(checkpoint);
1609        let request = DataWindowRequest {
1610            dataset: self.dataset.clone(),
1611            offset,
1612            limit: pipeline.batch_size.max(1),
1613            query_token: None,
1614            window_token: checkpoint.map(ToString::to_string),
1615            wire_format: WireFormatProfile::Json,
1616        };
1617        let response = map_integration_result(
1618            "clickhouse.pipeline_source",
1619            run_with_contract(
1620                "clickhouse",
1621                "pipeline_source.read_batch",
1622                self.contract,
1623                context,
1624                |_| {
1625                    if self.artificial_delay_ms > 0 {
1626                        thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1627                    }
1628                    self.adapter.run_high_volume_window_query(
1629                        self.query.clone(),
1630                        request.clone(),
1631                        context,
1632                    )
1633                },
1634            ),
1635        )?;
1636        let next_offset = response.offset.saturating_add(response.rows.len());
1637        Ok(PipelineBatch {
1638            records: response.rows,
1639            next_checkpoint: Some(next_offset.to_string()),
1640            has_more: next_offset < response.total_rows,
1641            lineage: Some(response.query_token),
1642            source_freshness_unix_ms: Some(now_unix_ms()),
1643        })
1644    }
1645}
1646
1647#[derive(Debug, Clone)]
1648pub struct BigQueryPipelineSource<A: BigQueryAdapter + ?Sized> {
1649    source_name: String,
1650    adapter: Arc<A>,
1651    query: SqlCommand,
1652    dataset: String,
1653    contract: AdapterCallContract,
1654    artificial_delay_ms: u64,
1655}
1656
1657impl<A: BigQueryAdapter + ?Sized> BigQueryPipelineSource<A> {
1658    pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
1659        Self {
1660            source_name: "bigquery".to_string(),
1661            adapter,
1662            query,
1663            dataset: dataset.into(),
1664            contract: AdapterCallContract::default_query(),
1665            artificial_delay_ms: 0,
1666        }
1667    }
1668
1669    pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1670        self.contract = contract;
1671        self
1672    }
1673
1674    pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1675        self.artificial_delay_ms = artificial_delay_ms;
1676        self
1677    }
1678}
1679
1680impl<A: BigQueryAdapter + ?Sized + Send + Sync> Source for BigQueryPipelineSource<A> {
1681    fn source_name(&self) -> &str {
1682        &self.source_name
1683    }
1684
1685    fn read_batch(
1686        &self,
1687        pipeline: &Pipeline,
1688        checkpoint: Option<&str>,
1689        context: &QueryContext,
1690    ) -> DataResult<PipelineBatch> {
1691        let offset = parse_checkpoint_offset(checkpoint);
1692        let request = DataWindowRequest {
1693            dataset: self.dataset.clone(),
1694            offset,
1695            limit: pipeline.batch_size.max(1),
1696            query_token: None,
1697            window_token: checkpoint.map(ToString::to_string),
1698            wire_format: WireFormatProfile::Json,
1699        };
1700        let response = map_integration_result(
1701            "bigquery.pipeline_source",
1702            run_with_contract(
1703                "bigquery",
1704                "pipeline_source.read_batch",
1705                self.contract,
1706                context,
1707                |_| {
1708                    if self.artificial_delay_ms > 0 {
1709                        thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1710                    }
1711                    self.adapter.run_high_volume_window_query(
1712                        self.query.clone(),
1713                        request.clone(),
1714                        context,
1715                    )
1716                },
1717            ),
1718        )?;
1719        let next_offset = response.offset.saturating_add(response.rows.len());
1720        Ok(PipelineBatch {
1721            records: response.rows,
1722            next_checkpoint: Some(next_offset.to_string()),
1723            has_more: next_offset < response.total_rows,
1724            lineage: Some(response.query_token),
1725            source_freshness_unix_ms: Some(now_unix_ms()),
1726        })
1727    }
1728}
1729
1730#[derive(Debug, Clone)]
1731pub struct OpenSearchPipelineSource<A: OpenSearchAdapter + ?Sized> {
1732    source_name: String,
1733    adapter: Arc<A>,
1734    request_template: SearchRequest,
1735    contract: AdapterCallContract,
1736    artificial_delay_ms: u64,
1737}
1738
1739impl<A: OpenSearchAdapter + ?Sized> OpenSearchPipelineSource<A> {
1740    pub fn new(adapter: Arc<A>, request_template: SearchRequest) -> Self {
1741        Self {
1742            source_name: "opensearch".to_string(),
1743            adapter,
1744            request_template,
1745            contract: AdapterCallContract::default_query(),
1746            artificial_delay_ms: 0,
1747        }
1748    }
1749
1750    pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1751        self.contract = contract;
1752        self
1753    }
1754
1755    pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1756        self.artificial_delay_ms = artificial_delay_ms;
1757        self
1758    }
1759}
1760
1761impl<A: OpenSearchAdapter + ?Sized + Send + Sync> Source for OpenSearchPipelineSource<A> {
1762    fn source_name(&self) -> &str {
1763        &self.source_name
1764    }
1765
1766    fn read_batch(
1767        &self,
1768        pipeline: &Pipeline,
1769        checkpoint: Option<&str>,
1770        context: &QueryContext,
1771    ) -> DataResult<PipelineBatch> {
1772        let offset = parse_checkpoint_offset(checkpoint);
1773        let window = DataWindowRequest {
1774            dataset: self.request_template.index.clone(),
1775            offset,
1776            limit: pipeline.batch_size.max(1),
1777            query_token: None,
1778            window_token: checkpoint.map(ToString::to_string),
1779            wire_format: WireFormatProfile::Json,
1780        };
1781        let response = map_integration_result(
1782            "opensearch.pipeline_source",
1783            run_with_contract(
1784                "opensearch",
1785                "pipeline_source.read_batch",
1786                self.contract,
1787                context,
1788                |_| {
1789                    if self.artificial_delay_ms > 0 {
1790                        thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1791                    }
1792                    self.adapter.search_window(
1793                        self.request_template.clone(),
1794                        window.clone(),
1795                        context,
1796                    )
1797                },
1798            ),
1799        )?;
1800        let next_offset = response.offset.saturating_add(response.rows.len());
1801        Ok(PipelineBatch {
1802            records: response.rows,
1803            next_checkpoint: Some(next_offset.to_string()),
1804            has_more: next_offset < response.total_rows,
1805            lineage: Some(response.query_token),
1806            source_freshness_unix_ms: Some(now_unix_ms()),
1807        })
1808    }
1809}
1810
1811#[derive(Clone)]
1812pub struct SingleStorePipelineSink<A: SingleStoreAdapter + ?Sized> {
1813    adapter: Arc<A>,
1814    probe_statement: String,
1815    contract: AdapterCallContract,
1816    inner: IdempotentInMemorySink,
1817}
1818
1819impl<A: SingleStoreAdapter + ?Sized> std::fmt::Debug for SingleStorePipelineSink<A> {
1820    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1821        f.debug_struct("SingleStorePipelineSink")
1822            .finish_non_exhaustive()
1823    }
1824}
1825
1826impl<A: SingleStoreAdapter + ?Sized> SingleStorePipelineSink<A> {
1827    pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
1828        Self {
1829            adapter,
1830            probe_statement: "SELECT 1".to_string(),
1831            contract: AdapterCallContract::default_query(),
1832            inner: IdempotentInMemorySink::new("singlestore-pipeline-sink", key_field),
1833        }
1834    }
1835
1836    pub fn with_probe_statement(mut self, statement: impl Into<String>) -> Self {
1837        self.probe_statement = statement.into();
1838        self
1839    }
1840
1841    pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1842        self.contract = contract;
1843        self
1844    }
1845}
1846
1847impl<A: SingleStoreAdapter + ?Sized + Send + Sync> Sink for SingleStorePipelineSink<A> {
1848    fn sink_name(&self) -> &str {
1849        "singlestore"
1850    }
1851
1852    fn write_batch(
1853        &self,
1854        pipeline: &Pipeline,
1855        run: &PipelineRun,
1856        records: &[Row],
1857        context: &QueryContext,
1858    ) -> DataResult<SinkWriteResult> {
1859        map_integration_result(
1860            "singlestore.pipeline_sink",
1861            run_with_contract(
1862                "singlestore",
1863                "pipeline_sink.write_batch",
1864                self.contract,
1865                context,
1866                |_| {
1867                    self.adapter
1868                        .run_query(
1869                            SqlCommand::new(self.probe_statement.clone(), Vec::new()),
1870                            context,
1871                        )
1872                        .map(|_| ())
1873                },
1874            ),
1875        )?;
1876        self.inner.write_batch(pipeline, run, records, context)
1877    }
1878}
1879
1880#[derive(Clone)]
1881pub struct ClickHousePipelineSink<A: ClickHouseAdapter + ?Sized> {
1882    adapter: Arc<A>,
1883    probe_statement: String,
1884    contract: AdapterCallContract,
1885    inner: IdempotentInMemorySink,
1886}
1887
1888impl<A: ClickHouseAdapter + ?Sized> std::fmt::Debug for ClickHousePipelineSink<A> {
1889    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1890        f.debug_struct("ClickHousePipelineSink")
1891            .finish_non_exhaustive()
1892    }
1893}
1894
1895impl<A: ClickHouseAdapter + ?Sized> ClickHousePipelineSink<A> {
1896    pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
1897        Self {
1898            adapter,
1899            probe_statement: "SELECT 1".to_string(),
1900            contract: AdapterCallContract::default_query(),
1901            inner: IdempotentInMemorySink::new("clickhouse-pipeline-sink", key_field),
1902        }
1903    }
1904}
1905
1906impl<A: ClickHouseAdapter + ?Sized + Send + Sync> Sink for ClickHousePipelineSink<A> {
1907    fn sink_name(&self) -> &str {
1908        "clickhouse"
1909    }
1910
1911    fn write_batch(
1912        &self,
1913        pipeline: &Pipeline,
1914        run: &PipelineRun,
1915        records: &[Row],
1916        context: &QueryContext,
1917    ) -> DataResult<SinkWriteResult> {
1918        map_integration_result(
1919            "clickhouse.pipeline_sink",
1920            run_with_contract(
1921                "clickhouse",
1922                "pipeline_sink.write_batch",
1923                self.contract,
1924                context,
1925                |_| {
1926                    self.adapter
1927                        .run_query(
1928                            SqlCommand::new(self.probe_statement.clone(), Vec::new()),
1929                            context,
1930                        )
1931                        .map(|_| ())
1932                },
1933            ),
1934        )?;
1935        self.inner.write_batch(pipeline, run, records, context)
1936    }
1937}
1938
1939#[derive(Clone)]
1940pub struct BigQueryPipelineSink<A: BigQueryAdapter + ?Sized> {
1941    adapter: Arc<A>,
1942    probe_statement: String,
1943    contract: AdapterCallContract,
1944    inner: IdempotentInMemorySink,
1945}
1946
1947impl<A: BigQueryAdapter + ?Sized> std::fmt::Debug for BigQueryPipelineSink<A> {
1948    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1949        f.debug_struct("BigQueryPipelineSink")
1950            .finish_non_exhaustive()
1951    }
1952}
1953
1954impl<A: BigQueryAdapter + ?Sized> BigQueryPipelineSink<A> {
1955    pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
1956        Self {
1957            adapter,
1958            probe_statement: "SELECT 1".to_string(),
1959            contract: AdapterCallContract::default_query(),
1960            inner: IdempotentInMemorySink::new("bigquery-pipeline-sink", key_field),
1961        }
1962    }
1963}
1964
1965impl<A: BigQueryAdapter + ?Sized + Send + Sync> Sink for BigQueryPipelineSink<A> {
1966    fn sink_name(&self) -> &str {
1967        "bigquery"
1968    }
1969
1970    fn write_batch(
1971        &self,
1972        pipeline: &Pipeline,
1973        run: &PipelineRun,
1974        records: &[Row],
1975        context: &QueryContext,
1976    ) -> DataResult<SinkWriteResult> {
1977        map_integration_result(
1978            "bigquery.pipeline_sink",
1979            run_with_contract(
1980                "bigquery",
1981                "pipeline_sink.write_batch",
1982                self.contract,
1983                context,
1984                |_| {
1985                    self.adapter
1986                        .run_query(
1987                            SqlCommand::new(self.probe_statement.clone(), Vec::new()),
1988                            context,
1989                        )
1990                        .map(|_| ())
1991                },
1992            ),
1993        )?;
1994        self.inner.write_batch(pipeline, run, records, context)
1995    }
1996}
1997
1998#[derive(Clone)]
1999pub struct OpenSearchPipelineSink<A: OpenSearchAdapter + ?Sized> {
2000    adapter: Arc<A>,
2001    probe_request: SearchRequest,
2002    contract: AdapterCallContract,
2003    inner: IdempotentInMemorySink,
2004}
2005
2006impl<A: OpenSearchAdapter + ?Sized> std::fmt::Debug for OpenSearchPipelineSink<A> {
2007    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2008        f.debug_struct("OpenSearchPipelineSink")
2009            .finish_non_exhaustive()
2010    }
2011}
2012
2013impl<A: OpenSearchAdapter + ?Sized> OpenSearchPipelineSink<A> {
2014    pub fn new(
2015        adapter: Arc<A>,
2016        key_field: impl Into<String>,
2017        probe_index: impl Into<String>,
2018    ) -> Self {
2019        Self {
2020            adapter,
2021            probe_request: SearchRequest::new(probe_index, ""),
2022            contract: AdapterCallContract::default_query(),
2023            inner: IdempotentInMemorySink::new("opensearch-pipeline-sink", key_field),
2024        }
2025    }
2026}
2027
2028impl<A: OpenSearchAdapter + ?Sized + Send + Sync> Sink for OpenSearchPipelineSink<A> {
2029    fn sink_name(&self) -> &str {
2030        "opensearch"
2031    }
2032
2033    fn write_batch(
2034        &self,
2035        pipeline: &Pipeline,
2036        run: &PipelineRun,
2037        records: &[Row],
2038        context: &QueryContext,
2039    ) -> DataResult<SinkWriteResult> {
2040        map_integration_result(
2041            "opensearch.pipeline_sink",
2042            run_with_contract(
2043                "opensearch",
2044                "pipeline_sink.write_batch",
2045                self.contract,
2046                context,
2047                |_| {
2048                    self.adapter
2049                        .search(self.probe_request.clone(), context)
2050                        .map(|_| ())
2051                },
2052            ),
2053        )?;
2054        self.inner.write_batch(pipeline, run, records, context)
2055    }
2056}
2057
2058#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2059pub struct PipelineConformanceCheck {
2060    pub name: String,
2061    pub passed: bool,
2062    pub detail: String,
2063}
2064
2065#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
2066pub struct PipelineConformanceReport {
2067    pub checks: Vec<PipelineConformanceCheck>,
2068}
2069
2070impl PipelineConformanceReport {
2071    pub fn passed(&self) -> bool {
2072        self.checks.iter().all(|check| check.passed)
2073    }
2074
2075    fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
2076        self.checks.push(PipelineConformanceCheck {
2077            name: name.into(),
2078            passed: true,
2079            detail: detail.into(),
2080        });
2081    }
2082
2083    fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
2084        self.checks.push(PipelineConformanceCheck {
2085            name: name.into(),
2086            passed: false,
2087            detail: detail.into(),
2088        });
2089    }
2090}
2091
2092pub fn run_pipeline_adapter_conformance_suite(
2093    singlestore: Arc<dyn SingleStoreAdapter>,
2094    clickhouse: Arc<dyn ClickHouseAdapter>,
2095    bigquery: Arc<dyn BigQueryAdapter>,
2096    opensearch: Arc<dyn OpenSearchAdapter>,
2097    context: &QueryContext,
2098) -> PipelineConformanceReport {
2099    let mut report = PipelineConformanceReport::default();
2100
2101    let sql_query = SqlCommand::new(
2102        "SELECT tenant, active_accounts FROM tenant_rollup",
2103        Vec::new(),
2104    );
2105    let search_request = SearchRequest::new("accounts", "");
2106
2107    let mut checkpoint_resume_checks = Vec::new();
2108    checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2109        "singlestore",
2110        Pipeline::new(
2111            "m60-singlestore-checkpoint",
2112            "M60 SingleStore Checkpoint",
2113            "singlestore",
2114            "passthrough",
2115            "idempotent_sink",
2116        ),
2117        Arc::new(SingleStorePipelineSource::new(
2118            singlestore.clone(),
2119            sql_query.clone(),
2120            "tenant_rollup",
2121        )),
2122        Arc::new(IdempotentInMemorySink::new(
2123            "m60-sink-singlestore",
2124            "tenant",
2125        )),
2126        context.clone(),
2127    ));
2128    checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2129        "clickhouse",
2130        Pipeline::new(
2131            "m60-clickhouse-checkpoint",
2132            "M60 ClickHouse Checkpoint",
2133            "clickhouse",
2134            "passthrough",
2135            "idempotent_sink",
2136        ),
2137        Arc::new(ClickHousePipelineSource::new(
2138            clickhouse.clone(),
2139            sql_query.clone(),
2140            "tenant_rollup",
2141        )),
2142        Arc::new(IdempotentInMemorySink::new("m60-sink-clickhouse", "tenant")),
2143        context.clone(),
2144    ));
2145    checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2146        "bigquery",
2147        Pipeline::new(
2148            "m60-bigquery-checkpoint",
2149            "M60 BigQuery Checkpoint",
2150            "bigquery",
2151            "passthrough",
2152            "idempotent_sink",
2153        ),
2154        Arc::new(BigQueryPipelineSource::new(
2155            bigquery.clone(),
2156            sql_query.clone(),
2157            "tenant_rollup",
2158        )),
2159        Arc::new(IdempotentInMemorySink::new("m60-sink-bigquery", "tenant")),
2160        context.clone(),
2161    ));
2162    checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2163        "opensearch",
2164        Pipeline::new(
2165            "m60-opensearch-checkpoint",
2166            "M60 OpenSearch Checkpoint",
2167            "opensearch",
2168            "passthrough",
2169            "idempotent_sink",
2170        ),
2171        Arc::new(OpenSearchPipelineSource::new(
2172            opensearch.clone(),
2173            search_request.clone(),
2174        )),
2175        Arc::new(IdempotentInMemorySink::new(
2176            "m60-sink-opensearch",
2177            "account",
2178        )),
2179        context.clone(),
2180    ));
2181
2182    for (driver, passed, detail) in checkpoint_resume_checks {
2183        if passed {
2184            report.add_pass(format!("{driver}.checkpoint_resume"), detail);
2185        } else {
2186            report.add_fail(format!("{driver}.checkpoint_resume"), detail);
2187        }
2188    }
2189
2190    let mut retry_checks = Vec::new();
2191    retry_checks.push(run_retry_probe(
2192        "singlestore",
2193        Arc::new(SingleStorePipelineSource::new(
2194            singlestore.clone(),
2195            sql_query.clone(),
2196            "tenant_rollup",
2197        )),
2198        context.clone(),
2199    ));
2200    retry_checks.push(run_retry_probe(
2201        "clickhouse",
2202        Arc::new(ClickHousePipelineSource::new(
2203            clickhouse.clone(),
2204            sql_query.clone(),
2205            "tenant_rollup",
2206        )),
2207        context.clone(),
2208    ));
2209    retry_checks.push(run_retry_probe(
2210        "bigquery",
2211        Arc::new(BigQueryPipelineSource::new(
2212            bigquery.clone(),
2213            sql_query.clone(),
2214            "tenant_rollup",
2215        )),
2216        context.clone(),
2217    ));
2218    retry_checks.push(run_retry_probe(
2219        "opensearch",
2220        Arc::new(OpenSearchPipelineSource::new(
2221            opensearch.clone(),
2222            search_request.clone(),
2223        )),
2224        context.clone(),
2225    ));
2226
2227    for (driver, passed, detail) in retry_checks {
2228        if passed {
2229            report.add_pass(format!("{driver}.retry"), detail);
2230        } else {
2231            report.add_fail(format!("{driver}.retry"), detail);
2232        }
2233    }
2234
2235    let mut timeout_checks = Vec::new();
2236    timeout_checks.push(run_timeout_probe(
2237        "singlestore",
2238        Arc::new(
2239            SingleStorePipelineSource::new(singlestore, sql_query.clone(), "tenant_rollup")
2240                .with_artificial_delay_ms(5),
2241        ),
2242        context.clone().with_timeout_ms(1),
2243    ));
2244    timeout_checks.push(run_timeout_probe(
2245        "clickhouse",
2246        Arc::new(
2247            ClickHousePipelineSource::new(clickhouse, sql_query.clone(), "tenant_rollup")
2248                .with_artificial_delay_ms(5),
2249        ),
2250        context.clone().with_timeout_ms(1),
2251    ));
2252    timeout_checks.push(run_timeout_probe(
2253        "bigquery",
2254        Arc::new(
2255            BigQueryPipelineSource::new(bigquery, sql_query, "tenant_rollup")
2256                .with_artificial_delay_ms(5),
2257        ),
2258        context.clone().with_timeout_ms(1),
2259    ));
2260    timeout_checks.push(run_timeout_probe(
2261        "opensearch",
2262        Arc::new(
2263            OpenSearchPipelineSource::new(opensearch, search_request).with_artificial_delay_ms(5),
2264        ),
2265        context.clone().with_timeout_ms(1),
2266    ));
2267
2268    for (driver, passed, detail) in timeout_checks {
2269        if passed {
2270            report.add_pass(format!("{driver}.timeout"), detail);
2271        } else {
2272            report.add_fail(format!("{driver}.timeout"), detail);
2273        }
2274    }
2275
2276    report
2277}
2278
2279fn run_checkpoint_resume_probe(
2280    driver: &str,
2281    base_pipeline: Pipeline,
2282    source: Arc<dyn Source>,
2283    sink: Arc<dyn Sink>,
2284    context: QueryContext,
2285) -> (String, bool, String) {
2286    let checkpoint_store = Arc::new(InMemoryCheckpointStore::default());
2287    let dead_letters = Arc::new(InMemoryDeadLetterStore::default());
2288    let events = Arc::new(InMemoryPipelineEventStore::default());
2289    let first_runtime = PipelineRuntime::new(
2290        base_pipeline
2291            .clone()
2292            .with_batch_size(1)
2293            .with_max_batches_per_run(Some(1)),
2294        source.clone(),
2295        Arc::new(PassthroughTransform),
2296        sink.clone(),
2297        checkpoint_store.clone(),
2298        dead_letters.clone(),
2299        events.clone(),
2300    );
2301    let first = first_runtime.execute_manual(context.clone(), PipelineControl::default(), None);
2302    let Ok(first_run) = first else {
2303        return (
2304            driver.to_string(),
2305            false,
2306            format!("initial run failed: {}", first.unwrap_err()),
2307        );
2308    };
2309    if first_run.status != PipelineRunStatus::Paused {
2310        return (
2311            driver.to_string(),
2312            false,
2313            format!("expected first status paused, got {:?}", first_run.status),
2314        );
2315    }
2316    if first_run.last_checkpoint.is_none() {
2317        return (
2318            driver.to_string(),
2319            false,
2320            "first run did not save checkpoint".to_string(),
2321        );
2322    }
2323
2324    let second_runtime = PipelineRuntime::new(
2325        base_pipeline.with_batch_size(1),
2326        source,
2327        Arc::new(PassthroughTransform),
2328        sink.clone(),
2329        checkpoint_store,
2330        dead_letters,
2331        events,
2332    );
2333    let second = second_runtime.execute_manual(context.clone(), PipelineControl::default(), None);
2334    let Ok(second_run) = second else {
2335        return (
2336            driver.to_string(),
2337            false,
2338            format!("resume run failed: {}", second.unwrap_err()),
2339        );
2340    };
2341    if second_run.status != PipelineRunStatus::Succeeded {
2342        return (
2343            driver.to_string(),
2344            false,
2345            format!(
2346                "expected resume status succeeded, got {:?}",
2347                second_run.status
2348            ),
2349        );
2350    }
2351
2352    let third = second_runtime.execute_manual(context, PipelineControl::default(), None);
2353    let Ok(third_run) = third else {
2354        return (
2355            driver.to_string(),
2356            false,
2357            format!("repeat run failed: {}", third.unwrap_err()),
2358        );
2359    };
2360    if third_run.rows_written != 0 {
2361        return (
2362            driver.to_string(),
2363            false,
2364            format!(
2365                "expected idempotent repeat run writes=0 got {}",
2366                third_run.rows_written
2367            ),
2368        );
2369    }
2370
2371    (
2372        driver.to_string(),
2373        true,
2374        format!(
2375            "resume status={:?}, repeat writes={}",
2376            second_run.status, third_run.rows_written
2377        ),
2378    )
2379}
2380
2381fn run_retry_probe(
2382    driver: &str,
2383    source: Arc<dyn Source>,
2384    context: QueryContext,
2385) -> (String, bool, String) {
2386    let attempts = Arc::new(AtomicUsize::new(0));
2387    let attempts_clone = attempts.clone();
2388    let sink = Arc::new(
2389        IdempotentInMemorySink::new(format!("m60-flaky-{driver}"), "tenant").with_probe(Arc::new(
2390            move |_ctx| {
2391                let attempt = attempts_clone.fetch_add(1, Ordering::SeqCst);
2392                if attempt == 0 {
2393                    return Err(DataError::Integration(
2394                        "transient sink probe failure".to_string(),
2395                    ));
2396                }
2397                Ok(())
2398            },
2399        )),
2400    );
2401    let runtime = PipelineRuntime::new(
2402        Pipeline::new(
2403            format!("m60-{driver}-retry"),
2404            format!("M60 {driver} Retry Probe"),
2405            driver,
2406            "passthrough",
2407            "flaky_sink",
2408        )
2409        .with_batch_size(1)
2410        .with_retry_policy(RetryPolicy {
2411            max_attempts: 2,
2412            initial_backoff_ms: 0,
2413            max_backoff_ms: 0,
2414        }),
2415        source,
2416        Arc::new(PassthroughTransform),
2417        sink,
2418        Arc::new(InMemoryCheckpointStore::default()),
2419        Arc::new(InMemoryDeadLetterStore::default()),
2420        Arc::new(InMemoryPipelineEventStore::default()),
2421    );
2422    match runtime.execute_manual(context, PipelineControl::default(), None) {
2423        Ok(run) if run.status == PipelineRunStatus::Succeeded && run.retries >= 1 => {
2424            (driver.to_string(), true, format!("retries={}", run.retries))
2425        }
2426        Ok(run) => (
2427            driver.to_string(),
2428            false,
2429            format!(
2430                "unexpected run status={:?} retries={}",
2431                run.status, run.retries
2432            ),
2433        ),
2434        Err(err) => (
2435            driver.to_string(),
2436            false,
2437            format!("retry probe failed: {err}"),
2438        ),
2439    }
2440}
2441
2442fn run_timeout_probe(
2443    driver: &str,
2444    source: Arc<dyn Source>,
2445    context: QueryContext,
2446) -> (String, bool, String) {
2447    let runtime = PipelineRuntime::new(
2448        Pipeline::new(
2449            format!("m60-{driver}-timeout"),
2450            format!("M60 {driver} Timeout Probe"),
2451            driver,
2452            "passthrough",
2453            "noop_sink",
2454        )
2455        .with_batch_size(1)
2456        .with_deadline_ms(1_000)
2457        .with_retry_policy(RetryPolicy::never()),
2458        source,
2459        Arc::new(PassthroughTransform),
2460        Arc::new(IdempotentInMemorySink::new("m60-timeout-sink", "tenant")),
2461        Arc::new(InMemoryCheckpointStore::default()),
2462        Arc::new(InMemoryDeadLetterStore::default()),
2463        Arc::new(InMemoryPipelineEventStore::default()),
2464    );
2465    match runtime.execute_manual(context, PipelineControl::default(), None) {
2466        Ok(run) => {
2467            let detail = run.errors.join(" | ");
2468            let passed = run.status == PipelineRunStatus::Failed
2469                && detail.to_ascii_lowercase().contains("timeout");
2470            (
2471                driver.to_string(),
2472                passed,
2473                if passed {
2474                    "timeout classification observed".to_string()
2475                } else {
2476                    format!(
2477                        "expected failed timeout run, got status={:?} errors={detail}",
2478                        run.status
2479                    )
2480                },
2481            )
2482        }
2483        Err(err) => (driver.to_string(), false, format!("runtime failed: {err}")),
2484    }
2485}
2486
2487fn parse_checkpoint_offset(checkpoint: Option<&str>) -> usize {
2488    let Some(raw) = checkpoint else {
2489        return 0;
2490    };
2491    let trimmed = raw.trim();
2492    if trimmed.is_empty() {
2493        return 0;
2494    }
2495    if let Ok(value) = trimmed.parse::<usize>() {
2496        return value;
2497    }
2498    if let Some(value) = trimmed
2499        .split("offset=")
2500        .nth(1)
2501        .and_then(|tail| tail.split(':').next())
2502        .and_then(|value| value.parse::<usize>().ok())
2503    {
2504        return value;
2505    }
2506    0
2507}
2508
2509fn is_retryable_pipeline_error(err: &DataError) -> bool {
2510    match err {
2511        DataError::Integration(message) | DataError::Query(message) => {
2512            let normalized = message.to_ascii_lowercase();
2513            normalized.contains("transient")
2514                || normalized.contains("timeout")
2515                || normalized.contains("unavailable")
2516                || normalized.contains("retry")
2517        }
2518        DataError::Io(_) => true,
2519        _ => false,
2520    }
2521}
2522
2523fn now_unix_ms() -> u64 {
2524    SystemTime::now()
2525        .duration_since(UNIX_EPOCH)
2526        .unwrap_or(Duration::from_secs(0))
2527        .as_millis() as u64
2528}
2529
2530fn system_time_to_unix_ms(value: SystemTime) -> Option<u64> {
2531    value
2532        .duration_since(UNIX_EPOCH)
2533        .ok()
2534        .map(|duration| duration.as_millis() as u64)
2535}
2536
2537fn next_counter() -> u64 {
2538    static RUN_COUNTER: AtomicU64 = AtomicU64::new(1);
2539    RUN_COUNTER.fetch_add(1, Ordering::Relaxed)
2540}
2541
2542pub fn pipeline_records_to_json_lines(records: &[Row]) -> DataResult<String> {
2543    let mut out = String::new();
2544    for row in records {
2545        out.push_str(&serde_json::to_string(row)?);
2546        out.push('\n');
2547    }
2548    Ok(out)
2549}
2550
2551pub fn write_pipeline_records_to_file(path: &Path, records: &[Row]) -> DataResult<()> {
2552    let payload = pipeline_records_to_json_lines(records)?;
2553    fs::write(path, payload)?;
2554    Ok(())
2555}
2556
2557#[cfg(test)]
2558mod tests {
2559    use super::*;
2560    use crate::{
2561        InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryOpenSearchAdapter,
2562        InMemorySingleStoreAdapter, StoredRow,
2563    };
2564    use serde_json::json;
2565    use std::collections::VecDeque;
2566
2567    fn row(pairs: &[(&str, Value)]) -> Row {
2568        pairs
2569            .iter()
2570            .map(|(key, value)| ((*key).to_string(), value.clone()))
2571            .collect()
2572    }
2573
2574    #[derive(Debug, Clone)]
2575    struct SequenceSource {
2576        source_name: String,
2577        responses: Arc<Mutex<VecDeque<DataResult<PipelineBatch>>>>,
2578    }
2579
2580    impl SequenceSource {
2581        fn new(
2582            source_name: impl Into<String>,
2583            responses: impl IntoIterator<Item = DataResult<PipelineBatch>>,
2584        ) -> Self {
2585            Self {
2586                source_name: source_name.into(),
2587                responses: Arc::new(Mutex::new(responses.into_iter().collect())),
2588            }
2589        }
2590    }
2591
2592    impl Source for SequenceSource {
2593        fn source_name(&self) -> &str {
2594            &self.source_name
2595        }
2596
2597        fn read_batch(
2598            &self,
2599            _pipeline: &Pipeline,
2600            _checkpoint: Option<&str>,
2601            _context: &QueryContext,
2602        ) -> DataResult<PipelineBatch> {
2603            let mut guard = self
2604                .responses
2605                .lock()
2606                .map_err(|_| DataError::Integration("sequence source lock poisoned".to_string()))?;
2607            Ok(guard
2608                .pop_front()
2609                .transpose()?
2610                .unwrap_or_else(PipelineBatch::empty))
2611        }
2612    }
2613
2614    #[derive(Debug, Clone)]
2615    struct AlwaysFailTransform {
2616        message: String,
2617    }
2618
2619    impl Transform for AlwaysFailTransform {
2620        fn transform_name(&self) -> &str {
2621            "always-fail"
2622        }
2623
2624        fn transform_batch(
2625            &self,
2626            _pipeline: &Pipeline,
2627            _run: &PipelineRun,
2628            _records: Vec<Row>,
2629            _context: &QueryContext,
2630        ) -> DataResult<TransformBatch> {
2631            Err(DataError::Validation(self.message.clone()))
2632        }
2633    }
2634
2635    #[derive(Debug, Clone)]
2636    struct AlwaysFailSink {
2637        sink_name: String,
2638        message: String,
2639    }
2640
2641    impl Sink for AlwaysFailSink {
2642        fn sink_name(&self) -> &str {
2643            &self.sink_name
2644        }
2645
2646        fn write_batch(
2647            &self,
2648            _pipeline: &Pipeline,
2649            _run: &PipelineRun,
2650            _records: &[Row],
2651            _context: &QueryContext,
2652        ) -> DataResult<SinkWriteResult> {
2653            Err(DataError::Validation(self.message.clone()))
2654        }
2655    }
2656
2657    #[test]
2658    fn pipeline_runtime_resume_checkpoint_and_idempotent_sink_prevents_duplicates() {
2659        let source = Arc::new(InMemoryRecordSource::new(
2660            "in-memory",
2661            vec![
2662                row(&[("id", json!(1)), ("account", json!("Acme"))]),
2663                row(&[("id", json!(2)), ("account", json!("Globex"))]),
2664                row(&[("id", json!(3)), ("account", json!("Umbrella"))]),
2665            ],
2666        ));
2667        let sink = Arc::new(IdempotentInMemorySink::new("idempotent", "id"));
2668        let checkpoints = Arc::new(InMemoryCheckpointStore::default());
2669        let dead_letters = Arc::new(InMemoryDeadLetterStore::default());
2670        let events = Arc::new(InMemoryPipelineEventStore::default());
2671        let context = QueryContext::default().with_tenant_id("tenant-a");
2672
2673        let first = PipelineRuntime::new(
2674            Pipeline::new(
2675                "pipeline-resume",
2676                "Resume Pipeline",
2677                "source",
2678                "transform",
2679                "sink",
2680            )
2681            .with_batch_size(1)
2682            .with_max_batches_per_run(Some(1)),
2683            source.clone(),
2684            Arc::new(PassthroughTransform),
2685            sink.clone(),
2686            checkpoints.clone(),
2687            dead_letters.clone(),
2688            events.clone(),
2689        )
2690        .execute_manual(context.clone(), PipelineControl::default(), None)
2691        .unwrap();
2692
2693        assert_eq!(first.status, PipelineRunStatus::Paused);
2694        assert_eq!(first.rows_written, 1);
2695        assert_eq!(sink.rows().unwrap().len(), 1);
2696
2697        let resumed = PipelineRuntime::new(
2698            Pipeline::new(
2699                "pipeline-resume",
2700                "Resume Pipeline",
2701                "source",
2702                "transform",
2703                "sink",
2704            )
2705            .with_batch_size(1),
2706            source.clone(),
2707            Arc::new(PassthroughTransform),
2708            sink.clone(),
2709            checkpoints.clone(),
2710            dead_letters.clone(),
2711            events.clone(),
2712        )
2713        .execute_manual(context.clone(), PipelineControl::default(), None)
2714        .unwrap();
2715
2716        assert_eq!(resumed.status, PipelineRunStatus::Succeeded);
2717        assert_eq!(sink.rows().unwrap().len(), 3);
2718
2719        let repeat = PipelineRuntime::new(
2720            Pipeline::new(
2721                "pipeline-resume",
2722                "Resume Pipeline",
2723                "source",
2724                "transform",
2725                "sink",
2726            )
2727            .with_batch_size(1),
2728            source,
2729            Arc::new(PassthroughTransform),
2730            sink.clone(),
2731            checkpoints,
2732            dead_letters,
2733            events,
2734        )
2735        .execute_manual(context, PipelineControl::default(), None)
2736        .unwrap();
2737        assert_eq!(repeat.rows_written, 0);
2738        assert_eq!(sink.rows().unwrap().len(), 3);
2739    }
2740
2741    #[derive(Debug, Clone)]
2742    struct StatusAwareTransform;
2743
2744    impl Transform for StatusAwareTransform {
2745        fn transform_name(&self) -> &str {
2746            "status-aware"
2747        }
2748
2749        fn transform_batch(
2750            &self,
2751            _pipeline: &Pipeline,
2752            _run: &PipelineRun,
2753            records: Vec<Row>,
2754            _context: &QueryContext,
2755        ) -> DataResult<TransformBatch> {
2756            let mut ok = Vec::new();
2757            let mut dead_letters = Vec::new();
2758            for record in records {
2759                if record
2760                    .get("status")
2761                    .and_then(Value::as_str)
2762                    .is_some_and(|status| status == "bad")
2763                {
2764                    dead_letters.push(PipelineDeadLetterInput {
2765                        stage: PipelineStage::Transform,
2766                        reason: "blocked status".to_string(),
2767                        record,
2768                    });
2769                } else {
2770                    ok.push(record);
2771                }
2772            }
2773            Ok(TransformBatch {
2774                records: ok,
2775                dead_letters,
2776            })
2777        }
2778    }
2779
2780    #[test]
2781    fn pipeline_runtime_routes_transform_dead_letters_and_tracks_counts() {
2782        let runtime = PipelineRuntime::new(
2783            Pipeline::new(
2784                "pipeline-deadletter",
2785                "Deadletter Pipeline",
2786                "source",
2787                "status-aware",
2788                "sink",
2789            )
2790            .with_batch_size(10),
2791            Arc::new(InMemoryRecordSource::new(
2792                "in-memory",
2793                vec![
2794                    row(&[("id", json!(1)), ("status", json!("ok"))]),
2795                    row(&[("id", json!(2)), ("status", json!("bad"))]),
2796                    row(&[("id", json!(3)), ("status", json!("ok"))]),
2797                ],
2798            )),
2799            Arc::new(StatusAwareTransform),
2800            Arc::new(IdempotentInMemorySink::new("sink", "id")),
2801            Arc::new(InMemoryCheckpointStore::default()),
2802            Arc::new(InMemoryDeadLetterStore::default()),
2803            Arc::new(InMemoryPipelineEventStore::default()),
2804        );
2805
2806        let run = runtime
2807            .execute_manual(QueryContext::default(), PipelineControl::default(), None)
2808            .unwrap();
2809        assert_eq!(run.status, PipelineRunStatus::Succeeded);
2810        assert_eq!(run.rows_dead_letter, 1);
2811
2812        let snapshot = runtime.operation_snapshot(&run).unwrap();
2813        assert_eq!(snapshot.dead_letters.len(), 1);
2814        assert!(snapshot.dead_letters[0].reason.contains("blocked status"));
2815    }
2816
2817    #[test]
2818    fn pipeline_runtime_respects_operator_pause_and_cancel_controls() {
2819        let runtime = PipelineRuntime::new(
2820            Pipeline::new(
2821                "pipeline-control",
2822                "Control Pipeline",
2823                "source",
2824                "transform",
2825                "sink",
2826            )
2827            .with_batch_size(1),
2828            Arc::new(InMemoryRecordSource::new(
2829                "in-memory",
2830                vec![
2831                    row(&[("id", json!(1))]),
2832                    row(&[("id", json!(2))]),
2833                    row(&[("id", json!(3))]),
2834                ],
2835            )),
2836            Arc::new(PassthroughTransform),
2837            Arc::new(IdempotentInMemorySink::new("sink", "id")),
2838            Arc::new(InMemoryCheckpointStore::default()),
2839            Arc::new(InMemoryDeadLetterStore::default()),
2840            Arc::new(InMemoryPipelineEventStore::default()),
2841        );
2842
2843        let paused_control = PipelineControl::default();
2844        paused_control.pause();
2845        let paused = runtime
2846            .execute_manual(QueryContext::default(), paused_control, None)
2847            .unwrap();
2848        assert_eq!(paused.status, PipelineRunStatus::Paused);
2849        assert_eq!(paused.rows_read, 0);
2850
2851        let canceled_control = PipelineControl::default();
2852        canceled_control.cancel();
2853        let canceled = runtime
2854            .execute_manual(QueryContext::default(), canceled_control, None)
2855            .unwrap();
2856        assert_eq!(canceled.status, PipelineRunStatus::Canceled);
2857        assert_eq!(canceled.rows_read, 0);
2858    }
2859
2860    #[test]
2861    fn file_and_object_store_profiles_roundtrip_rows() {
2862        let tmp = std::env::temp_dir().join(format!("shelly_m60_file_{}.jsonl", now_unix_ms()));
2863        let records = vec![
2864            row(&[("id", json!(1)), ("account", json!("Acme"))]),
2865            row(&[("id", json!(2)), ("account", json!("Globex"))]),
2866        ];
2867        write_pipeline_records_to_file(&tmp, &records).unwrap();
2868
2869        let file_source = FileJsonLineSource::new("file-source", &tmp);
2870        let file_batch = file_source
2871            .read_batch(
2872                &Pipeline::new("file", "file", "file", "transform", "sink").with_batch_size(10),
2873                None,
2874                &QueryContext::default(),
2875            )
2876            .unwrap();
2877        assert_eq!(file_batch.records.len(), 2);
2878
2879        let object_store = InMemoryObjectStore::default();
2880        object_store
2881            .put(
2882                "accounts",
2883                records
2884                    .iter()
2885                    .map(|row| serde_json::to_value(row).unwrap())
2886                    .collect(),
2887            )
2888            .unwrap();
2889        let object_source =
2890            ObjectStoreJsonSource::new("object-source", object_store.clone(), "accounts");
2891        let object_batch = object_source
2892            .read_batch(
2893                &Pipeline::new("obj", "obj", "obj", "transform", "sink").with_batch_size(10),
2894                None,
2895                &QueryContext::default(),
2896            )
2897            .unwrap();
2898        assert_eq!(object_batch.records.len(), 2);
2899
2900        let object_sink =
2901            ObjectStoreJsonSink::new("object-sink", object_store.clone(), "sink_rows", "id");
2902        let run = PipelineRun::new(
2903            "run-1".to_string(),
2904            "obj".to_string(),
2905            PipelineRunCommand::Manual { requested_by: None },
2906            &QueryContext::default(),
2907        );
2908        let write = object_sink
2909            .write_batch(
2910                &Pipeline::new("obj", "obj", "obj", "transform", "sink"),
2911                &run,
2912                &object_batch.records,
2913                &QueryContext::default(),
2914            )
2915            .unwrap();
2916        assert_eq!(write.written, 2);
2917        let stored = object_store.get("sink_rows").unwrap();
2918        assert_eq!(stored.len(), 2);
2919
2920        let _ = fs::remove_file(tmp);
2921    }
2922
2923    #[test]
2924    fn pipeline_builder_and_helper_paths_cover_edge_cases() {
2925        let pipeline = Pipeline::new("pipeline-helpers", "Helpers", "source", "transform", "sink")
2926            .with_trigger(PipelineTrigger::EventDriven)
2927            .with_batch_size(0)
2928            .with_max_batches_per_run(Some(0))
2929            .with_deadline_ms(0)
2930            .with_retry_policy(RetryPolicy {
2931                max_attempts: 0,
2932                initial_backoff_ms: 10,
2933                max_backoff_ms: 1,
2934            })
2935            .with_metadata("region", "us-east-1");
2936        assert_eq!(pipeline.trigger, PipelineTrigger::EventDriven);
2937        assert_eq!(pipeline.batch_size, 1);
2938        assert_eq!(pipeline.max_batches_per_run, Some(1));
2939        assert_eq!(pipeline.deadline_ms, 1);
2940        assert_eq!(pipeline.retry_policy.max_attempts, 1);
2941        assert_eq!(pipeline.retry_policy.initial_backoff_ms, 10);
2942        assert_eq!(pipeline.retry_policy.max_backoff_ms, 10);
2943        assert_eq!(
2944            pipeline.metadata.get("region").map(String::as_str),
2945            Some("us-east-1")
2946        );
2947
2948        assert_eq!(
2949            PipelineRunCommand::Manual { requested_by: None }.as_label(),
2950            "manual"
2951        );
2952        assert_eq!(
2953            PipelineRunCommand::Scheduled {
2954                schedule_id: "cron".to_string()
2955            }
2956            .as_label(),
2957            "scheduled"
2958        );
2959        assert_eq!(
2960            PipelineRunCommand::Event {
2961                event: "ingest".to_string(),
2962                payload: json!({"ok": true}),
2963            }
2964            .as_label(),
2965            "event"
2966        );
2967
2968        let mut run = PipelineRun::new(
2969            "run-helpers".to_string(),
2970            pipeline.id.clone(),
2971            PipelineRunCommand::Manual {
2972                requested_by: Some("operator".to_string()),
2973            },
2974            &QueryContext::default().with_tenant_id("tenant-a"),
2975        );
2976        assert_eq!(run.status, PipelineRunStatus::Queued);
2977        run.finish(PipelineRunStatus::Succeeded);
2978        assert_eq!(run.status, PipelineRunStatus::Succeeded);
2979        assert!(run.finished_at_unix_ms.is_some());
2980
2981        let empty_batch = PipelineBatch::empty();
2982        assert!(empty_batch.records.is_empty());
2983        assert!(!empty_batch.has_more);
2984        let passthrough = TransformBatch::passthrough(vec![row(&[("id", json!(1))])]);
2985        assert_eq!(passthrough.records.len(), 1);
2986        assert!(passthrough.dead_letters.is_empty());
2987
2988        let control = PipelineControl::default();
2989        assert!(!control.is_paused());
2990        assert!(!control.is_canceled());
2991        control.pause();
2992        assert!(control.is_paused());
2993        control.resume();
2994        assert!(!control.is_paused());
2995        control.cancel();
2996        assert!(control.is_canceled());
2997
2998        assert_eq!(parse_checkpoint_offset(None), 0);
2999        assert_eq!(parse_checkpoint_offset(Some("")), 0);
3000        assert_eq!(parse_checkpoint_offset(Some("7")), 7);
3001        assert_eq!(parse_checkpoint_offset(Some("query:offset=9:limit=1")), 9);
3002        assert_eq!(parse_checkpoint_offset(Some("offset=abc")), 0);
3003
3004        assert!(is_retryable_pipeline_error(&DataError::Integration(
3005            "transient backend timeout".to_string()
3006        )));
3007        assert!(is_retryable_pipeline_error(&DataError::Query(
3008            "service unavailable".to_string()
3009        )));
3010        assert!(is_retryable_pipeline_error(&DataError::Io(
3011            std::io::Error::other("disk busy")
3012        )));
3013        assert!(!is_retryable_pipeline_error(&DataError::Validation(
3014            "invalid payload".to_string()
3015        )));
3016
3017        let lines = pipeline_records_to_json_lines(&[
3018            row(&[("id", json!(1)), ("name", json!("Acme"))]),
3019            row(&[("id", json!(2)), ("name", json!("Globex"))]),
3020        ])
3021        .unwrap();
3022        assert!(lines.ends_with('\n'));
3023        assert_eq!(lines.lines().count(), 2);
3024    }
3025
3026    #[test]
3027    fn in_memory_pipeline_stores_cover_crud_paths() {
3028        let checkpoints = InMemoryCheckpointStore::default();
3029        checkpoints
3030            .save_checkpoint("pipeline-a", Some("tenant-a"), "11")
3031            .unwrap();
3032        assert_eq!(
3033            checkpoints
3034                .load_checkpoint("pipeline-a", Some("tenant-a"))
3035                .unwrap()
3036                .as_deref(),
3037            Some("11")
3038        );
3039        checkpoints
3040            .clear_checkpoint("pipeline-a", Some("tenant-a"))
3041            .unwrap();
3042        assert!(checkpoints
3043            .load_checkpoint("pipeline-a", Some("tenant-a"))
3044            .unwrap()
3045            .is_none());
3046
3047        let dead_letters = InMemoryDeadLetterStore::default();
3048        dead_letters
3049            .push_dead_letter(PipelineDeadLetter {
3050                run_id: "run-1".to_string(),
3051                pipeline_id: "pipeline-a".to_string(),
3052                stage: PipelineStage::Sink,
3053                reason: "duplicate".to_string(),
3054                record: row(&[("id", json!(1))]),
3055                checkpoint: Some("1".to_string()),
3056                attempt: 1,
3057                timestamp_unix_ms: now_unix_ms(),
3058            })
3059            .unwrap();
3060        dead_letters
3061            .push_dead_letter(PipelineDeadLetter {
3062                run_id: "run-2".to_string(),
3063                pipeline_id: "pipeline-a".to_string(),
3064                stage: PipelineStage::Transform,
3065                reason: "invalid".to_string(),
3066                record: row(&[("id", json!(2))]),
3067                checkpoint: Some("2".to_string()),
3068                attempt: 2,
3069                timestamp_unix_ms: now_unix_ms(),
3070            })
3071            .unwrap();
3072        let run1_letters = dead_letters.list_dead_letters("run-1").unwrap();
3073        assert_eq!(run1_letters.len(), 1);
3074        assert_eq!(run1_letters[0].reason, "duplicate");
3075
3076        let events = InMemoryPipelineEventStore::default();
3077        events
3078            .append_event(PipelineEvent {
3079                run_id: "run-1".to_string(),
3080                pipeline_id: "pipeline-a".to_string(),
3081                timestamp_unix_ms: now_unix_ms(),
3082                kind: PipelineEventKind::RunStarted,
3083                message: "started".to_string(),
3084                tenant_id: Some("tenant-a".to_string()),
3085                trace_id: None,
3086                correlation_id: Some("corr-1".to_string()),
3087                request_id: Some("req-1".to_string()),
3088                metrics: BTreeMap::from_iter([("rows_read".to_string(), 0u64)]),
3089                metadata: BTreeMap::from_iter([("source".to_string(), "in-memory".to_string())]),
3090            })
3091            .unwrap();
3092        events
3093            .append_event(PipelineEvent {
3094                run_id: "run-2".to_string(),
3095                pipeline_id: "pipeline-a".to_string(),
3096                timestamp_unix_ms: now_unix_ms(),
3097                kind: PipelineEventKind::RunSucceeded,
3098                message: "done".to_string(),
3099                tenant_id: Some("tenant-a".to_string()),
3100                trace_id: None,
3101                correlation_id: Some("corr-2".to_string()),
3102                request_id: Some("req-2".to_string()),
3103                metrics: BTreeMap::new(),
3104                metadata: BTreeMap::new(),
3105            })
3106            .unwrap();
3107        let run1_events = events.list_events("run-1").unwrap();
3108        assert_eq!(run1_events.len(), 1);
3109        assert_eq!(run1_events[0].kind, PipelineEventKind::RunStarted);
3110    }
3111
3112    #[test]
3113    fn source_profiles_cover_missing_and_invalid_payload_paths() {
3114        let pipeline = Pipeline::new("source-checks", "Source Checks", "source", "t", "sink")
3115            .with_batch_size(2);
3116
3117        let missing_file =
3118            std::env::temp_dir().join(format!("shelly_missing_{}.jsonl", now_unix_ms()));
3119        let missing_source = FileJsonLineSource::new("missing-file", &missing_file);
3120        let missing_batch = missing_source
3121            .read_batch(&pipeline, Some("offset=3"), &QueryContext::default())
3122            .unwrap();
3123        assert!(missing_batch.records.is_empty());
3124        assert_eq!(missing_batch.next_checkpoint.as_deref(), Some("3"));
3125        assert_eq!(Source::source_name(&missing_source), "missing-file");
3126
3127        let invalid_file =
3128            std::env::temp_dir().join(format!("shelly_invalid_{}.jsonl", now_unix_ms()));
3129        fs::write(&invalid_file, "[1,2,3]\n").unwrap();
3130        let invalid_source = FileJsonLineSource::new("invalid-file", &invalid_file);
3131        let err = invalid_source
3132            .read_batch(&pipeline, None, &QueryContext::default())
3133            .unwrap_err()
3134            .to_string();
3135        assert!(err.contains("expected JSON object row"));
3136        let _ = fs::remove_file(&invalid_file);
3137
3138        let object_store = InMemoryObjectStore::default();
3139        object_store
3140            .put("bad_rows", vec![json!(["not-an-object"])])
3141            .unwrap();
3142        let object_source = ObjectStoreJsonSource::new("object-source", object_store, "bad_rows");
3143        let err = object_source
3144            .read_batch(&pipeline, None, &QueryContext::default())
3145            .unwrap_err()
3146            .to_string();
3147        assert!(err.contains("expected JSON object rows"));
3148    }
3149
3150    #[test]
3151    fn enterprise_pipeline_adapters_cover_success_and_error_paths() {
3152        let sql_rows = vec![
3153            row(&[("tenant", json!("north")), ("active_accounts", json!(42))]),
3154            row(&[("tenant", json!("south")), ("active_accounts", json!(27))]),
3155        ];
3156        let search_rows = vec![
3157            StoredRow {
3158                id: 1,
3159                data: row(&[("account", json!("Acme")), ("status", json!("renewal"))]),
3160            },
3161            StoredRow {
3162                id: 2,
3163                data: row(&[("account", json!("Globex")), ("status", json!("new"))]),
3164            },
3165        ];
3166        let pipeline = Pipeline::new("enterprise", "Enterprise", "source", "transform", "sink")
3167            .with_batch_size(2);
3168        let context = QueryContext::default()
3169            .with_tenant_id("tenant-a")
3170            .with_correlation_id("corr-a")
3171            .with_request_id("req-a");
3172        let query = SqlCommand::new(
3173            "SELECT tenant, active_accounts FROM tenant_rollup",
3174            Vec::new(),
3175        );
3176
3177        let singlestore_source = SingleStorePipelineSource::new(
3178            Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3179            query.clone(),
3180            "tenant_rollup",
3181        )
3182        .with_contract(AdapterCallContract::default_query().with_timeout_ms(5_000))
3183        .with_artificial_delay_ms(0);
3184        let batch = singlestore_source
3185            .read_batch(&pipeline, Some("offset=0"), &context)
3186            .unwrap();
3187        assert_eq!(batch.records.len(), 2);
3188        assert_eq!(Source::source_name(&singlestore_source), "singlestore");
3189
3190        let clickhouse_source = ClickHousePipelineSource::new(
3191            Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
3192            query.clone(),
3193            "tenant_rollup",
3194        )
3195        .with_contract(AdapterCallContract::low_latency().with_timeout_ms(1_000))
3196        .with_artificial_delay_ms(0);
3197        let batch = clickhouse_source
3198            .read_batch(&pipeline, Some("offset=0"), &context)
3199            .unwrap();
3200        assert_eq!(batch.records.len(), 2);
3201
3202        let bigquery_source = BigQueryPipelineSource::new(
3203            Arc::new(InMemoryBigQueryAdapter::new(sql_rows.clone())),
3204            query.clone(),
3205            "tenant_rollup",
3206        )
3207        .with_contract(AdapterCallContract::default_query())
3208        .with_artificial_delay_ms(0);
3209        let batch = bigquery_source
3210            .read_batch(&pipeline, Some("offset=0"), &context)
3211            .unwrap();
3212        assert_eq!(batch.records.len(), 2);
3213
3214        let opensearch_source = OpenSearchPipelineSource::new(
3215            Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
3216            SearchRequest::new("accounts", ""),
3217        )
3218        .with_contract(AdapterCallContract::default_query())
3219        .with_artificial_delay_ms(0);
3220        let batch = opensearch_source
3221            .read_batch(&pipeline, Some("offset=0"), &context)
3222            .unwrap();
3223        assert_eq!(batch.records.len(), 2);
3224
3225        let err = SingleStorePipelineSource::new(
3226            Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3227            SqlCommand::new("", Vec::new()),
3228            "tenant_rollup",
3229        )
3230        .read_batch(&pipeline, None, &context)
3231        .unwrap_err()
3232        .to_string();
3233        assert!(err.contains("empty SQL statement"));
3234
3235        let err = OpenSearchPipelineSource::new(
3236            Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
3237            SearchRequest::new("", ""),
3238        )
3239        .read_batch(&pipeline, None, &context)
3240        .unwrap_err()
3241        .to_string();
3242        assert!(err.contains("search index must not be empty"));
3243
3244        let run = PipelineRun::new(
3245            "run-1".to_string(),
3246            "enterprise".to_string(),
3247            PipelineRunCommand::Manual { requested_by: None },
3248            &context,
3249        );
3250        let records = vec![
3251            row(&[("id", json!(1)), ("account", json!("Acme"))]),
3252            row(&[("id", json!(1)), ("account", json!("Acme"))]),
3253            row(&[("id", json!(2)), ("account", json!("Globex"))]),
3254        ];
3255
3256        let singlestore_sink = SingleStorePipelineSink::new(
3257            Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3258            "id",
3259        )
3260        .with_probe_statement("SELECT 1")
3261        .with_contract(AdapterCallContract::default_query());
3262        let write = singlestore_sink
3263            .write_batch(&pipeline, &run, &records, &context)
3264            .unwrap();
3265        assert_eq!(write.written, 2);
3266        assert_eq!(write.duplicate_writes, 1);
3267        assert_eq!(Sink::sink_name(&singlestore_sink), "singlestore");
3268        assert!(format!("{singlestore_sink:?}").contains("SingleStorePipelineSink"));
3269
3270        let err = SingleStorePipelineSink::new(
3271            Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3272            "id",
3273        )
3274        .with_probe_statement("")
3275        .write_batch(&pipeline, &run, &records, &context)
3276        .unwrap_err()
3277        .to_string();
3278        assert!(err.contains("empty SQL statement"));
3279
3280        let clickhouse_sink = ClickHousePipelineSink::new(
3281            Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
3282            "id",
3283        );
3284        let write = clickhouse_sink
3285            .write_batch(&pipeline, &run, &records, &context)
3286            .unwrap();
3287        assert_eq!(write.written, 2);
3288        assert_eq!(Sink::sink_name(&clickhouse_sink), "clickhouse");
3289        assert!(format!("{clickhouse_sink:?}").contains("ClickHousePipelineSink"));
3290
3291        let bigquery_sink =
3292            BigQueryPipelineSink::new(Arc::new(InMemoryBigQueryAdapter::new(sql_rows)), "id");
3293        let write = bigquery_sink
3294            .write_batch(&pipeline, &run, &records, &context)
3295            .unwrap();
3296        assert_eq!(write.written, 2);
3297        assert_eq!(Sink::sink_name(&bigquery_sink), "bigquery");
3298        assert!(format!("{bigquery_sink:?}").contains("BigQueryPipelineSink"));
3299
3300        let opensearch_sink = OpenSearchPipelineSink::new(
3301            Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
3302            "id",
3303            "accounts",
3304        );
3305        let write = opensearch_sink
3306            .write_batch(&pipeline, &run, &records, &context)
3307            .unwrap();
3308        assert_eq!(write.written, 2);
3309        assert_eq!(Sink::sink_name(&opensearch_sink), "opensearch");
3310        assert!(format!("{opensearch_sink:?}").contains("OpenSearchPipelineSink"));
3311
3312        let err = OpenSearchPipelineSink::new(
3313            Arc::new(InMemoryOpenSearchAdapter::new(search_rows)),
3314            "id",
3315            "",
3316        )
3317        .write_batch(&pipeline, &run, &records, &context)
3318        .unwrap_err()
3319        .to_string();
3320        assert!(err.contains("search index must not be empty"));
3321    }
3322
3323    #[test]
3324    fn conformance_report_tracks_failures() {
3325        let mut report = PipelineConformanceReport::default();
3326        report.add_pass("checkpoint_resume", "ok");
3327        assert!(report.passed());
3328        report.add_fail("retry", "transient error");
3329        assert!(!report.passed());
3330        assert_eq!(report.checks.len(), 2);
3331        assert!(!report.checks[1].passed);
3332    }
3333
3334    #[test]
3335    fn pipeline_adapter_conformance_suite_passes_with_reference_adapters() {
3336        let sql_rows = vec![
3337            row(&[("tenant", json!("north")), ("active_accounts", json!(42))]),
3338            row(&[("tenant", json!("south")), ("active_accounts", json!(27))]),
3339        ];
3340        let search_rows = vec![
3341            StoredRow {
3342                id: 1,
3343                data: row(&[("account", json!("Acme")), ("status", json!("renewal"))]),
3344            },
3345            StoredRow {
3346                id: 2,
3347                data: row(&[("account", json!("Globex")), ("status", json!("new"))]),
3348            },
3349        ];
3350
3351        let report = run_pipeline_adapter_conformance_suite(
3352            Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3353            Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
3354            Arc::new(InMemoryBigQueryAdapter::new(sql_rows)),
3355            Arc::new(InMemoryOpenSearchAdapter::new(search_rows)),
3356            &QueryContext::default()
3357                .with_tenant_id("tenant-a")
3358                .with_correlation_id("m60-correlation")
3359                .with_request_id("m60-request"),
3360        );
3361
3362        assert!(
3363            report.passed(),
3364            "expected conformance suite to pass, got {:?}",
3365            report
3366        );
3367        assert!(report
3368            .checks
3369            .iter()
3370            .any(|check| check.name.ends_with(".timeout")));
3371        assert!(report
3372            .checks
3373            .iter()
3374            .any(|check| check.name.ends_with(".retry")));
3375        assert!(report
3376            .checks
3377            .iter()
3378            .any(|check| check.name.ends_with(".checkpoint_resume")));
3379    }
3380
3381    #[test]
3382    fn pipeline_runtime_command_wrappers_execute_scheduled_and_event() {
3383        let runtime = PipelineRuntime::new(
3384            Pipeline::new(
3385                "pipeline-command-wrappers",
3386                "Pipeline Command Wrappers",
3387                "sequence",
3388                "passthrough",
3389                "sink",
3390            ),
3391            Arc::new(SequenceSource::new(
3392                "sequence",
3393                [Ok(PipelineBatch::empty()), Ok(PipelineBatch::empty())],
3394            )),
3395            Arc::new(PassthroughTransform),
3396            Arc::new(IdempotentInMemorySink::new("sink", "id")),
3397            Arc::new(InMemoryCheckpointStore::default()),
3398            Arc::new(InMemoryDeadLetterStore::default()),
3399            Arc::new(InMemoryPipelineEventStore::default()),
3400        );
3401        let context = QueryContext::default().with_tenant_id("tenant-schedule");
3402
3403        let scheduled = runtime
3404            .execute_scheduled(
3405                context.clone(),
3406                PipelineControl::default(),
3407                "schedule-nightly",
3408            )
3409            .unwrap();
3410        assert_eq!(scheduled.status, PipelineRunStatus::Succeeded);
3411        assert!(matches!(
3412            scheduled.command,
3413            PipelineRunCommand::Scheduled { .. }
3414        ));
3415
3416        let event = runtime
3417            .execute_event(
3418                context,
3419                PipelineControl::default(),
3420                "ingest",
3421                json!({"reason": "manual"}),
3422            )
3423            .unwrap();
3424        assert_eq!(event.status, PipelineRunStatus::Succeeded);
3425        assert!(matches!(event.command, PipelineRunCommand::Event { .. }));
3426    }
3427
3428    #[test]
3429    fn pipeline_runtime_covers_source_error_and_deadline_paths() {
3430        let source_failure_runtime = PipelineRuntime::new(
3431            Pipeline::new(
3432                "pipeline-source-failure",
3433                "Pipeline Source Failure",
3434                "sequence",
3435                "passthrough",
3436                "sink",
3437            )
3438            .with_retry_policy(RetryPolicy::never()),
3439            Arc::new(SequenceSource::new(
3440                "sequence",
3441                [Err(DataError::Validation(
3442                    "source validation failure".to_string(),
3443                ))],
3444            )),
3445            Arc::new(PassthroughTransform),
3446            Arc::new(IdempotentInMemorySink::new("sink", "id")),
3447            Arc::new(InMemoryCheckpointStore::default()),
3448            Arc::new(InMemoryDeadLetterStore::default()),
3449            Arc::new(InMemoryPipelineEventStore::default()),
3450        );
3451        let source_failed = source_failure_runtime
3452            .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3453            .unwrap();
3454        assert_eq!(source_failed.status, PipelineRunStatus::Failed);
3455        assert!(source_failed
3456            .errors
3457            .iter()
3458            .any(|error| error.contains("source validation failure")));
3459
3460        let deadline_runtime = PipelineRuntime::new(
3461            Pipeline::new(
3462                "pipeline-deadline",
3463                "Pipeline Deadline",
3464                "sequence",
3465                "passthrough",
3466                "sink",
3467            )
3468            .with_batch_size(1)
3469            .with_deadline_ms(1),
3470            Arc::new(SequenceSource::new(
3471                "sequence",
3472                [Ok(PipelineBatch {
3473                    records: vec![row(&[("id", json!(1))])],
3474                    next_checkpoint: Some("1".to_string()),
3475                    has_more: true,
3476                    lineage: Some("deadline-test".to_string()),
3477                    source_freshness_unix_ms: Some(now_unix_ms()),
3478                })],
3479            )),
3480            Arc::new(PassthroughTransform),
3481            Arc::new(
3482                IdempotentInMemorySink::new("sink", "id").with_probe(Arc::new(|_context| {
3483                    thread::sleep(Duration::from_millis(5));
3484                    Ok(())
3485                })),
3486            ),
3487            Arc::new(InMemoryCheckpointStore::default()),
3488            Arc::new(InMemoryDeadLetterStore::default()),
3489            Arc::new(InMemoryPipelineEventStore::default()),
3490        );
3491        let deadline_failed = deadline_runtime
3492            .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3493            .unwrap();
3494        assert_eq!(deadline_failed.status, PipelineRunStatus::Failed);
3495        assert!(deadline_failed
3496            .errors
3497            .iter()
3498            .any(|error| error.contains("deadline exceeded")));
3499    }
3500
3501    #[test]
3502    fn pipeline_runtime_covers_transform_and_sink_failure_paths() {
3503        let transform_failure_source = Arc::new(SequenceSource::new(
3504            "sequence",
3505            [Ok(PipelineBatch {
3506                records: vec![
3507                    row(&[("id", json!(1)), ("tenant", json!("a"))]),
3508                    row(&[("id", json!(2)), ("tenant", json!("b"))]),
3509                ],
3510                next_checkpoint: Some("2".to_string()),
3511                has_more: false,
3512                lineage: Some("transform-failure".to_string()),
3513                source_freshness_unix_ms: Some(now_unix_ms()),
3514            })],
3515        ));
3516        let transform_failure_dead_letters = Arc::new(InMemoryDeadLetterStore::default());
3517        let transform_failure_runtime = PipelineRuntime::new(
3518            Pipeline::new(
3519                "pipeline-transform-failure",
3520                "Pipeline Transform Failure",
3521                "sequence",
3522                "always-fail",
3523                "sink",
3524            ),
3525            transform_failure_source,
3526            Arc::new(AlwaysFailTransform {
3527                message: "transform denied row".to_string(),
3528            }),
3529            Arc::new(IdempotentInMemorySink::new("sink", "id")),
3530            Arc::new(InMemoryCheckpointStore::default()),
3531            transform_failure_dead_letters.clone(),
3532            Arc::new(InMemoryPipelineEventStore::default()),
3533        );
3534        let transform_failed = transform_failure_runtime
3535            .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3536            .unwrap();
3537        assert_eq!(transform_failed.status, PipelineRunStatus::Failed);
3538        assert_eq!(transform_failed.rows_dead_letter, 2);
3539        assert_eq!(transform_failed.last_checkpoint.as_deref(), Some("2"));
3540        assert!(transform_failed
3541            .errors
3542            .iter()
3543            .any(|error| error.contains("transform denied row")));
3544        let transform_snapshot = transform_failure_runtime
3545            .operation_snapshot(&transform_failed)
3546            .unwrap();
3547        assert_eq!(transform_snapshot.dead_letters.len(), 2);
3548        assert!(transform_snapshot
3549            .dead_letters
3550            .iter()
3551            .all(|dead_letter| dead_letter.stage == PipelineStage::Transform));
3552        assert_eq!(
3553            transform_failure_dead_letters
3554                .list_dead_letters(&transform_failed.id)
3555                .unwrap()
3556                .len(),
3557            2
3558        );
3559
3560        let sink_failure_source = Arc::new(SequenceSource::new(
3561            "sequence",
3562            [Ok(PipelineBatch {
3563                records: vec![
3564                    row(&[("id", json!(10)), ("tenant", json!("west"))]),
3565                    row(&[("id", json!(11)), ("tenant", json!("east"))]),
3566                ],
3567                next_checkpoint: Some("2".to_string()),
3568                has_more: false,
3569                lineage: Some("sink-failure".to_string()),
3570                source_freshness_unix_ms: Some(now_unix_ms()),
3571            })],
3572        ));
3573        let sink_failure_runtime = PipelineRuntime::new(
3574            Pipeline::new(
3575                "pipeline-sink-failure",
3576                "Pipeline Sink Failure",
3577                "sequence",
3578                "passthrough",
3579                "always-fail",
3580            ),
3581            sink_failure_source,
3582            Arc::new(PassthroughTransform),
3583            Arc::new(AlwaysFailSink {
3584                sink_name: "always-fail".to_string(),
3585                message: "sink rejected rows".to_string(),
3586            }),
3587            Arc::new(InMemoryCheckpointStore::default()),
3588            Arc::new(InMemoryDeadLetterStore::default()),
3589            Arc::new(InMemoryPipelineEventStore::default()),
3590        );
3591        let sink_run = sink_failure_runtime
3592            .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3593            .unwrap();
3594        assert_eq!(sink_run.status, PipelineRunStatus::Succeeded);
3595        assert_eq!(sink_run.rows_written, 0);
3596        assert_eq!(sink_run.rows_dead_letter, 2);
3597        assert!(sink_run
3598            .errors
3599            .iter()
3600            .any(|error| error.contains("sink rejected rows")));
3601        let sink_snapshot = sink_failure_runtime.operation_snapshot(&sink_run).unwrap();
3602        assert_eq!(sink_snapshot.dead_letters.len(), 2);
3603        assert!(sink_snapshot
3604            .dead_letters
3605            .iter()
3606            .all(|dead_letter| dead_letter.stage == PipelineStage::Sink));
3607    }
3608}